From 383235b495c32a1762511f1837bc9e98af6226eb Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Mon, 16 Dec 2019 20:59:41 -0500 Subject: Cluster distributed data store Add experimental cluster co-ordination service using Atomic framework. Included distributed data store creation utilities. Sample docker compose data cluster between cds controller and resource-resolution instances. Issue-ID: CCSDK-2000 Signed-off-by: Brinda Santh Change-Id: I4de00e773a996e08fd1d260fc27ed18832433883 --- .../modules/commons/atomix-lib/pom.xml | 56 ++++++++++ .../atomix/BluePrintAtomixLibConfiguration.kt | 31 ++++++ .../atomix/BluePrintAtomixLibExtensions.kt | 25 +++++ .../service/AtomixBluePrintClusterService.kt | 118 +++++++++++++++++++++ .../atomix/utils/AtomixLibUtils.kt | 82 ++++++++++++++ .../atomix/AtomixBluePrintClusterServiceTest.kt | 80 ++++++++++++++ .../atomix-lib/src/test/resources/logback-test.xml | 36 +++++++ ms/blueprintsprocessor/modules/commons/pom.xml | 1 + .../core/service/BluePrintClusterService.kt | 51 +++++++++ 9 files changed, 480 insertions(+) create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt (limited to 'ms/blueprintsprocessor/modules/commons') diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml new file mode 100644 index 000000000..7fa7b452a --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml @@ -0,0 +1,56 @@ + + + + + 4.0.0 + + + org.onap.ccsdk.cds.blueprintsprocessor + commons + 0.7.0-SNAPSHOT + + + atomix-lib + jar + + Blueprints Processor Atomix Lib + Blueprints Processor Atomix Lib + + + + io.atomix + atomix + + + io.atomix + atomix-raft + + + io.atomix + atomix-primary-backup + + + io.atomix + atomix-gossip + + + + org.onap.ccsdk.cds.blueprintsprocessor + db-lib + + + diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt new file mode 100644 index 000000000..8ea15935f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt @@ -0,0 +1,31 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix + +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import org.springframework.context.annotation.Configuration + +@Configuration +open class BluePrintAtomixLibConfiguration + +/** + * Exposed Dependency Service by this Atomix Lib Module + */ +fun BluePrintDependencyService.clusterService(): BluePrintClusterService = + instance(AtomixBluePrintClusterService::class) diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt new file mode 100644 index 000000000..696d728dd --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt @@ -0,0 +1,25 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix + +import io.atomix.core.map.DistributedMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +fun > T.toDistributedMap(): DistributedMap<*, *> { + return if (this != null && this is DistributedMap<*, *>) this + else throw BluePrintProcessorException("map is not of type DistributedMap") +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt new file mode 100644 index 000000000..27921be9d --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt @@ -0,0 +1,118 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service + +import io.atomix.cluster.ClusterMembershipEvent +import io.atomix.core.Atomix +import kotlinx.coroutines.delay +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.stereotype.Service +import java.time.Duration +import java.util.concurrent.CompletableFuture + +@Service +open class AtomixBluePrintClusterService : BluePrintClusterService { + + private val log = logger(AtomixBluePrintClusterService::class) + + lateinit var atomix: Atomix + + private var joined = false + + override suspend fun startCluster(clusterInfo: ClusterInfo) { + log.info( + "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + + "starting with members(${clusterInfo.clusterMembers})" + ) + + /** Create Atomix cluster either from config file or default multi-cast cluster*/ + atomix = if (!clusterInfo.configFile.isNullOrEmpty()) { + AtomixLibUtils.configAtomix(clusterInfo.configFile!!) + } else { + AtomixLibUtils.defaultMulticastAtomix(clusterInfo) + } + + /** Listen for the member chaneg events */ + atomix.membershipService.addListener { membershipEvent -> + when (membershipEvent.type()) { + ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added") + ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed") + ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed") + else -> log.info("***** Member event unknown") + } + } + atomix.start().join() + log.info( + "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + + "created successfully...." + ) + + /** Receive ping from network */ + val pingHandler = { message: String -> + log.info("####### ping message received : $message") + CompletableFuture.completedFuture(message) + } + atomix.communicationService.subscribe("ping", pingHandler) + + /** Ping the network */ + atomix.communicationService.broadcast( + "ping", + "ping from node(${clusterInfo.nodeId})" + ) + joined = true + } + + override fun clusterJoined(): Boolean { + return joined + } + + override suspend fun allMembers(): Set { + check(::atomix.isInitialized) { "failed to start and join cluster" } + check(atomix.isRunning) { "cluster is not running" } + return atomix.membershipService.members.map { + ClusterMember( + id = it.id().id(), + memberAddress = it.host() + ) + }.toSet() + } + + override suspend fun clusterMembersForPrefix(memberPrefix: String): Set { + check(::atomix.isInitialized) { "failed to start and join cluster" } + check(atomix.isRunning) { "cluster is not running" } + + return atomix.membershipService.members.filter { + it.id().id().startsWith(memberPrefix, true) + }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) } + .toSet() + } + + override suspend fun clusterMapStore(name: String): MutableMap { + return AtomixLibUtils.distributedMapStore(atomix, name) + } + + override suspend fun shutDown(duration: Duration) { + val shutDownMilli = duration.toMillis() + log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") + delay(shutDownMilli) + atomix.stop() + } +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt new file mode 100644 index 000000000..6e726a1a6 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt @@ -0,0 +1,82 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ArrayNode +import com.fasterxml.jackson.databind.node.MissingNode +import com.fasterxml.jackson.databind.node.NullNode +import com.fasterxml.jackson.databind.node.ObjectNode +import io.atomix.core.Atomix +import io.atomix.core.map.DistributedMap +import io.atomix.protocols.backup.MultiPrimaryProtocol +import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup +import io.atomix.protocols.raft.partition.RaftPartitionGroup +import io.atomix.utils.net.Address +import org.jsoup.nodes.TextNode +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile + +object AtomixLibUtils { + + fun configAtomix(filePath: String): Atomix { + val configFile = normalizedFile(filePath) + return Atomix.builder(configFile.absolutePath).build() + } + + fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix { + + val nodeId = clusterInfo.nodeId + + val raftPartitionGroup = RaftPartitionGroup.builder("system") + .withNumPartitions(7) + .withMembers(clusterInfo.clusterMembers) + .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId")) + .build() + + val primaryBackupGroup = + PrimaryBackupPartitionGroup.builder("data") + .withNumPartitions(31) + .build() + + return Atomix.builder() + .withMemberId(nodeId) + .withAddress(Address.from(clusterInfo.nodeAddress)) + .withManagementGroup(raftPartitionGroup) + .withPartitionGroups(primaryBackupGroup) + .withMulticastEnabled() + .build() + } + + fun distributedMapStore(atomix: Atomix, storeName: String): DistributedMap { + check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" } + + val protocol = MultiPrimaryProtocol.builder() + .withBackups(2) + .build() + + return atomix.mapBuilder(storeName) + .withProtocol(protocol) + .withCacheEnabled() + .withValueType(JsonNode::class.java) + .withExtraTypes( + JsonNode::class.java, TextNode::class.java, ObjectNode::class.java, + ArrayNode::class.java, NullNode::class.java, MissingNode::class.java + ) + .build() + } +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt new file mode 100644 index 000000000..919d6712b --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix + +import com.fasterxml.jackson.databind.JsonNode +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive +import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import kotlin.test.assertNotNull + +class AtomixBluePrintClusterServiceTest { + val log = logger(AtomixBluePrintClusterServiceTest::class) + + @Before + fun init() { + runBlocking { + deleteNBDir("target/cluster") + } + } + + /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/ + @Test + fun testClusterJoin() { + runBlocking { + val members = arrayListOf("node-5679", "node-5680") + val deferred = arrayListOf(5679, 5680).map { port -> + async(Dispatchers.IO) { + val nodeId = "node-$port" + log.info("********** Starting node($nodeId) on port($port)") + val clusterInfo = ClusterInfo( + id = "test-cluster", nodeId = nodeId, + clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster" + ) + val atomixClusterService = AtomixBluePrintClusterService() + atomixClusterService.startCluster(clusterInfo) + atomixClusterService.atomix + } + } + val atomix = deferred.awaitAll() + /** Test Distributed store creation */ + repeat(2) { storeId -> + val store = AtomixLibUtils.distributedMapStore(atomix.get(0), "blueprint-runtime-$storeId") + assertNotNull(store, "failed to get store") + val store1 = AtomixLibUtils.distributedMapStore(atomix.get(1), "blueprint-runtime-$storeId") + store1.addListener { + log.info("Received map event : $it") + } + repeat(10) { + store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() + } + delay(100) + store.close() + } + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml new file mode 100644 index 000000000..f1c625e8f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml @@ -0,0 +1,36 @@ + + + + + + + %d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n + + + + + + + + + + + + + + diff --git a/ms/blueprintsprocessor/modules/commons/pom.xml b/ms/blueprintsprocessor/modules/commons/pom.xml index 30c34ab52..78c569125 100755 --- a/ms/blueprintsprocessor/modules/commons/pom.xml +++ b/ms/blueprintsprocessor/modules/commons/pom.xml @@ -34,6 +34,7 @@ processor-core + atomix-lib db-lib rest-lib dmaap-lib diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt new file mode 100644 index 000000000..bbaa427c9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -0,0 +1,51 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.service + +import java.time.Duration + +interface BluePrintClusterService { + + /** Start the cluster with [clusterInfo], By default clustering service is disabled. + * Application module has to start cluster */ + suspend fun startCluster(clusterInfo: ClusterInfo) + + fun clusterJoined(): Boolean + + /** Returns all the data cluster members */ + suspend fun allMembers(): Set + + /** Returns data cluster members starting with prefix */ + suspend fun clusterMembersForPrefix(memberPrefix: String): Set + + /** Create and get or get the distributed data map store with [name] */ + suspend fun clusterMapStore(name: String): MutableMap + + /** Shut down the cluster with [duration] */ + suspend fun shutDown(duration: Duration) +} + +data class ClusterInfo( + val id: String, + var configFile: String? = null, + val nodeId: String, + val nodeAddress: String, + var clusterMembers: List, + var storagePath: String +) + +data class ClusterMember(val id: String, val memberAddress: String?) -- cgit 1.2.3-korg