diff options
author | Brinda Santh <bs2796@att.com> | 2020-02-05 15:51:03 -0500 |
---|---|---|
committer | Brinda Santh <bs2796@att.com> | 2020-02-12 14:16:28 -0500 |
commit | 65bb9d0d83762e8fa8e3ab568c801908eafa0686 (patch) | |
tree | ad8ae7fafc1954b44ddd9b72d1fceb482f042431 /ms/blueprintsprocessor/modules/commons/atomix-lib | |
parent | 723cb0b0f4fca052561f21bb8312bf7c6e8cd524 (diff) |
Cluster co-ordination with Hazelcast.
Remove Atomix implementation, due to Kubernetes clustering issues.
Cluster environment property changes.
Issue-ID: CCSDK-2011
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I23f40c92c0adc6b3ab8690871385f78525c76433
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/atomix-lib')
7 files changed, 0 insertions, 607 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml deleted file mode 100644 index 7fa7b452a..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml +++ /dev/null @@ -1,56 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Copyright © 2018-2019 AT&T Intellectual Property. - ~ - ~ Licensed under the Apache License, Version 2.0 (the "License"); - ~ you may not use this file except in compliance with the License. - ~ You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> - <artifactId>commons</artifactId> - <version>0.7.0-SNAPSHOT</version> - </parent> - - <artifactId>atomix-lib</artifactId> - <packaging>jar</packaging> - - <name>Blueprints Processor Atomix Lib</name> - <description>Blueprints Processor Atomix Lib</description> - - <dependencies> - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix</artifactId> - </dependency> - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix-raft</artifactId> - </dependency> - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix-primary-backup</artifactId> - </dependency> - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix-gossip</artifactId> - </dependency> - - <dependency> - <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> - <artifactId>db-lib</artifactId> - </dependency> - </dependencies> -</project> diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt deleted file mode 100644 index 8ef290303..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.atomix - -import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService -import org.springframework.context.annotation.Configuration - -@Configuration -open class BluePrintAtomixLibConfiguration - -/** - * Exposed Dependency Service by this Atomix Lib Module - */ -fun BluePrintDependencyService.clusterService(): BluePrintClusterService = - instance(AtomixBluePrintClusterService::class) - -/** Optional Cluster Service, returns only if Cluster is enabled */ -fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? { - return if (BluePrintConstants.CLUSTER_ENABLED) { - BluePrintDependencyService.clusterService() - } else null -} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt deleted file mode 100644 index 17d243620..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.atomix - -import com.fasterxml.jackson.databind.JsonNode -import io.atomix.core.map.DistributedMap -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException - -fun <T : Map<String, JsonNode>> T.toDistributedMap(): DistributedMap<String, JsonNode> { - return if (this != null && this is DistributedMap<*, *>) this as DistributedMap<String, JsonNode> - else throw BluePrintProcessorException("map is not of type DistributedMap") -} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt deleted file mode 100644 index 214a14310..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service - -import io.atomix.cluster.ClusterMembershipEvent -import io.atomix.core.Atomix -import io.atomix.core.lock.DistributedLock -import kotlinx.coroutines.delay -import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.springframework.stereotype.Service -import java.time.Duration -import java.util.concurrent.CompletableFuture - -@Service -open class AtomixBluePrintClusterService : BluePrintClusterService { - - private val log = logger(AtomixBluePrintClusterService::class) - - lateinit var atomix: Atomix - - override suspend fun startCluster(clusterInfo: ClusterInfo) { - log.info( - "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + - "starting with members(${clusterInfo.clusterMembers})" - ) - - /** Create Atomix cluster either from config file or default multi-cast cluster*/ - atomix = if (!clusterInfo.configFile.isNullOrEmpty()) { - AtomixLibUtils.configAtomix(clusterInfo.configFile!!) - } else { - AtomixLibUtils.defaultMulticastAtomix(clusterInfo) - } - - /** Listen for the member chaneg events */ - atomix.membershipService.addListener { membershipEvent -> - when (membershipEvent.type()) { - ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}") - ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}") - ClusterMembershipEvent.Type.REACHABILITY_CHANGED -> log.info("Reachability Changed : ${membershipEvent.subject()}") - ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}") - else -> log.info("Member event unknown") - } - } - /** Start and Join the Cluster */ - atomix.start().join() - log.info( - "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + - "created successfully...." - ) - - /** Receive ping from network */ - val pingHandler = { message: String -> - log.info("####### ping message received : $message") - CompletableFuture.completedFuture(message) - } - atomix.communicationService.subscribe("ping", pingHandler) - - /** Ping the network */ - atomix.communicationService.broadcast( - "ping", - "ping from node(${clusterInfo.nodeId})" - ) - } - - override fun clusterJoined(): Boolean { - return atomix.isRunning - } - - override suspend fun masterMember(partitionGroup: String): ClusterMember { - check(::atomix.isInitialized) { "failed to start and join cluster" } - check(atomix.isRunning) { "cluster is not running" } - val masterId = atomix.partitionService - .getPartitionGroup(partitionGroup) - .getPartition("1").primary() - val masterMember = atomix.membershipService.getMember(masterId) - return ClusterMember( - id = masterMember.id().id(), - memberAddress = masterMember.address().toString() - ) - } - - override suspend fun allMembers(): Set<ClusterMember> { - check(::atomix.isInitialized) { "failed to start and join cluster" } - check(atomix.isRunning) { "cluster is not running" } - - return atomix.membershipService.members.map { - ClusterMember( - id = it.id().id(), - memberAddress = it.address().toString() - ) - }.toSet() - } - - override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> { - check(::atomix.isInitialized) { "failed to start and join cluster" } - check(atomix.isRunning) { "cluster is not running" } - - return atomix.membershipService.members.filter { - it.id().id().startsWith(memberPrefix, true) - }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) } - .toSet() - } - - override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> { - check(::atomix.isInitialized) { "failed to start and join cluster" } - return AtomixLibUtils.distributedMapStore<T>(atomix, name) - } - - /** The DistributedLock is a distributed implementation of Java’s Lock. - * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to - * determine ordering of multiple concurrent lock holders. - * DistributedLocks are designed to account for failures within the cluster. - * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled, - * the lock will be released and granted to the next waiting process. * - */ - override suspend fun clusterLock(name: String): ClusterLock { - check(::atomix.isInitialized) { "failed to start and join cluster" } - return ClusterLockImpl(atomix, name) - } - - override suspend fun shutDown(duration: Duration) { - if (::atomix.isInitialized) { - val shutDownMilli = duration.toMillis() - log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") - delay(shutDownMilli) - atomix.stop() - } - } -} - -open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock { - val log = logger(ClusterLockImpl::class) - - lateinit var distributedLock: DistributedLock - - override fun name(): String { - return distributedLock.name() - } - - override suspend fun lock() { - distributedLock = AtomixLibUtils.distributedLock(atomix, name) - distributedLock.lock() - log.debug("Cluster lock($name) created..") - } - - override suspend fun tryLock(timeout: Long): Boolean { - distributedLock = AtomixLibUtils.distributedLock(atomix, name) - return distributedLock.tryLock(Duration.ofMillis(timeout)) - } - - override suspend fun unLock() { - distributedLock.unlock() - log.debug("Cluster unlock(${name()}) successfully..") - } - - override fun isLocked(): Boolean { - return distributedLock.isLocked - } - - override fun close() { - if (::distributedLock.isInitialized) { - distributedLock.close() - } - } -} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt deleted file mode 100644 index 9be15f2e3..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils - -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.node.ArrayNode -import com.fasterxml.jackson.databind.node.MissingNode -import com.fasterxml.jackson.databind.node.NullNode -import com.fasterxml.jackson.databind.node.ObjectNode -import io.atomix.core.Atomix -import io.atomix.core.lock.AtomicLock -import io.atomix.core.lock.DistributedLock -import io.atomix.core.map.DistributedMap -import io.atomix.protocols.backup.MultiPrimaryProtocol -import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup -import io.atomix.protocols.raft.partition.RaftPartitionGroup -import io.atomix.utils.net.Address -import org.jsoup.nodes.TextNode -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile - -object AtomixLibUtils { - private val log = logger(AtomixLibUtils::class) - - fun configAtomix(filePath: String): Atomix { - val configFile = normalizedFile(filePath) - return Atomix.builder(configFile.absolutePath).build() - } - - fun defaultMulticastAtomix( - clusterInfo: ClusterInfo, - raftPartitions: Int = 1, - primaryBackupPartitions: Int = 32 - ): Atomix { - - val nodeId = clusterInfo.nodeId - - val raftPartitionGroup = RaftPartitionGroup.builder("system") - .withNumPartitions(raftPartitions) - .withMembers(clusterInfo.clusterMembers) - .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId")) - .build() - - val primaryBackupGroup = - PrimaryBackupPartitionGroup.builder("data") - .withNumPartitions(primaryBackupPartitions) - .build() - - return Atomix.builder() - .withMemberId(nodeId) - .withAddress(Address.from(clusterInfo.nodeAddress)) - .withManagementGroup(raftPartitionGroup) - .withPartitionGroups(primaryBackupGroup) - .withMulticastEnabled() - .build() - } - - fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> { - check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" } - - val protocol = MultiPrimaryProtocol.builder() - .withBackups(numBackups) - .build() - - return atomix.mapBuilder<String, T>(storeName) - .withProtocol(protocol) - .withCacheEnabled() - .withValueType(JsonNode::class.java) - .withExtraTypes( - JsonNode::class.java, TextNode::class.java, ObjectNode::class.java, - ArrayNode::class.java, NullNode::class.java, MissingNode::class.java - ) - .build() - } - - fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock { - check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" } - - val protocol = MultiPrimaryProtocol.builder() - .withBackups(numBackups) - .build() - return atomix.lockBuilder(lockName) - .withProtocol(protocol) - .build() - } - - /** get Atomic distributed lock, to get lock fence information */ - fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock { - check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" } - - val protocol = MultiPrimaryProtocol.builder() - .withBackups(numBackups) - .build() - - return atomix.atomicLockBuilder(lockName) - .withProtocol(protocol) - .build() - } -} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt deleted file mode 100644 index 67bf4cabb..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.atomix - -import com.fasterxml.jackson.databind.JsonNode -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.delay -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext -import org.junit.Before -import org.junit.Test -import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive -import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import kotlin.test.assertNotNull -import kotlin.test.assertTrue - -class AtomixBluePrintClusterServiceTest { - private val log = logger(AtomixBluePrintClusterServiceTest::class) - - @Before - fun init() { - runBlocking { - deleteNBDir("target/cluster") - } - } - - /** Testing two cluster with distributed map store creation, This is time consuming test case, taks around 10s **/ - @Test - fun testClusterJoin() { - runBlocking { - val bluePrintClusterServiceOne = - createCluster(arrayListOf(5679, 5680)).toMutableList() - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680)) - // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) - val bluePrintClusterService = bluePrintClusterServiceOne[0] - log.info("Members : ${bluePrintClusterService.allMembers()}") - log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}") - log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}") - testDistributedStore(bluePrintClusterServiceOne) - testDistributedLock(bluePrintClusterServiceOne) - } - } - - private suspend fun createCluster( - ports: List<Int>, - otherClusterPorts: List<Int>? = null - ): List<BluePrintClusterService> { - - return withContext(Dispatchers.Default) { - val clusterMembers = ports.map { "node-$it" }.toMutableList() - /** Add the other cluster as members */ - if (!otherClusterPorts.isNullOrEmpty()) { - val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList() - clusterMembers.addAll(otherClusterMembers) - } - val deferred = ports.map { port -> - async(Dispatchers.IO) { - val nodeId = "node-$port" - log.info("********** Starting node($nodeId) on port($port)") - val clusterInfo = ClusterInfo( - id = "test-cluster", nodeId = nodeId, - clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster" - ) - val atomixClusterService = AtomixBluePrintClusterService() - atomixClusterService.startCluster(clusterInfo) - atomixClusterService - } - } - deferred.awaitAll() - } - } - - private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) { - /** Test Distributed store creation */ - repeat(2) { storeId -> - val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>( - "blueprint-runtime-$storeId" - ).toDistributedMap() - assertNotNull(store, "failed to get store") - val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>( - "blueprint-runtime-$storeId" - ).toDistributedMap() - - store1.addListener { - log.info("Received map event : $it") - } - repeat(5) { - store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() - } - delay(10) - store.close() - } - } - - private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) { - val lockName = "sample-lock" - withContext(Dispatchers.IO) { - val deferred = async { - executeLock(bluePrintClusterServices[0], "first", lockName) - } - val deferred2 = async { - executeLock(bluePrintClusterServices[0], "second", lockName) - } - val deferred3 = async { - executeLock(bluePrintClusterServices[1], "third", lockName) - } - deferred.start() - deferred2.start() - deferred3.start() - } - } - - private suspend fun executeLock( - bluePrintClusterService: BluePrintClusterService, - lockId: String, - lockName: String - ) { - log.info("initialising $lockId lock...") - val distributedLock = bluePrintClusterService.clusterLock(lockName) - assertNotNull(distributedLock, "failed to create distributed $lockId lock") - distributedLock.lock() - assertTrue(distributedLock.isLocked(), "failed to lock $lockId") - try { - log.info("locked $lockId process for 5mSec") - delay(5) - } finally { - distributedLock.unLock() - log.info("$lockId lock released") - } - distributedLock.close() - } -} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml deleted file mode 100644 index 016d48636..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml +++ /dev/null @@ -1,36 +0,0 @@ -<!-- - ~ Copyright © 2017-2018 AT&T Intellectual Property. - ~ - ~ Licensed under the Apache License, Version 2.0 (the "License"); - ~ you may not use this file except in compliance with the License. - ~ You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <!-- encoders are assigned the type - ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{100} - %msg%n</pattern> - </encoder> - </appender> - - <logger name="org.springframework.test" level="warn"/> - <logger name="org.springframework" level="warn"/> - <logger name="org.hibernate" level="info"/> - <logger name="io.atomix" level="warn"/> - <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/> - - <root level="warn"> - <appender-ref ref="STDOUT"/> - </root> - -</configuration> |