diff options
author | Brinda Santh <bs2796@att.com> | 2019-12-16 20:59:41 -0500 |
---|---|---|
committer | Brinda Santh <bs2796@att.com> | 2019-12-16 20:59:41 -0500 |
commit | 383235b495c32a1762511f1837bc9e98af6226eb (patch) | |
tree | afe0e093f4756b2f82679038a3029c515082c9e2 /ms/blueprintsprocessor/modules/commons/atomix-lib/src | |
parent | 73a37ecd64accefc0e4b8a9db2cb9e0127d94408 (diff) |
Cluster distributed data store
Add experimental cluster co-ordination service using Atomic framework.
Included distributed data store creation utilities.
Sample docker compose data cluster between cds controller and resource-resolution instances.
Issue-ID: CCSDK-2000
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I4de00e773a996e08fd1d260fc27ed18832433883
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/atomix-lib/src')
6 files changed, 372 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt new file mode 100644 index 000000000..8ea15935f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt @@ -0,0 +1,31 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix + +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import org.springframework.context.annotation.Configuration + +@Configuration +open class BluePrintAtomixLibConfiguration + +/** + * Exposed Dependency Service by this Atomix Lib Module + */ +fun BluePrintDependencyService.clusterService(): BluePrintClusterService = + instance(AtomixBluePrintClusterService::class) diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt new file mode 100644 index 000000000..696d728dd --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt @@ -0,0 +1,25 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix + +import io.atomix.core.map.DistributedMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +fun <T : Map<*, *>> T.toDistributedMap(): DistributedMap<*, *> { + return if (this != null && this is DistributedMap<*, *>) this + else throw BluePrintProcessorException("map is not of type DistributedMap") +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt new file mode 100644 index 000000000..27921be9d --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt @@ -0,0 +1,118 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service + +import io.atomix.cluster.ClusterMembershipEvent +import io.atomix.core.Atomix +import kotlinx.coroutines.delay +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.stereotype.Service +import java.time.Duration +import java.util.concurrent.CompletableFuture + +@Service +open class AtomixBluePrintClusterService : BluePrintClusterService { + + private val log = logger(AtomixBluePrintClusterService::class) + + lateinit var atomix: Atomix + + private var joined = false + + override suspend fun startCluster(clusterInfo: ClusterInfo) { + log.info( + "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + + "starting with members(${clusterInfo.clusterMembers})" + ) + + /** Create Atomix cluster either from config file or default multi-cast cluster*/ + atomix = if (!clusterInfo.configFile.isNullOrEmpty()) { + AtomixLibUtils.configAtomix(clusterInfo.configFile!!) + } else { + AtomixLibUtils.defaultMulticastAtomix(clusterInfo) + } + + /** Listen for the member chaneg events */ + atomix.membershipService.addListener { membershipEvent -> + when (membershipEvent.type()) { + ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added") + ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed") + ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed") + else -> log.info("***** Member event unknown") + } + } + atomix.start().join() + log.info( + "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + + "created successfully...." + ) + + /** Receive ping from network */ + val pingHandler = { message: String -> + log.info("####### ping message received : $message") + CompletableFuture.completedFuture(message) + } + atomix.communicationService.subscribe("ping", pingHandler) + + /** Ping the network */ + atomix.communicationService.broadcast( + "ping", + "ping from node(${clusterInfo.nodeId})" + ) + joined = true + } + + override fun clusterJoined(): Boolean { + return joined + } + + override suspend fun allMembers(): Set<ClusterMember> { + check(::atomix.isInitialized) { "failed to start and join cluster" } + check(atomix.isRunning) { "cluster is not running" } + return atomix.membershipService.members.map { + ClusterMember( + id = it.id().id(), + memberAddress = it.host() + ) + }.toSet() + } + + override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> { + check(::atomix.isInitialized) { "failed to start and join cluster" } + check(atomix.isRunning) { "cluster is not running" } + + return atomix.membershipService.members.filter { + it.id().id().startsWith(memberPrefix, true) + }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) } + .toSet() + } + + override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> { + return AtomixLibUtils.distributedMapStore<T>(atomix, name) + } + + override suspend fun shutDown(duration: Duration) { + val shutDownMilli = duration.toMillis() + log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") + delay(shutDownMilli) + atomix.stop() + } +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt new file mode 100644 index 000000000..6e726a1a6 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt @@ -0,0 +1,82 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ArrayNode +import com.fasterxml.jackson.databind.node.MissingNode +import com.fasterxml.jackson.databind.node.NullNode +import com.fasterxml.jackson.databind.node.ObjectNode +import io.atomix.core.Atomix +import io.atomix.core.map.DistributedMap +import io.atomix.protocols.backup.MultiPrimaryProtocol +import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup +import io.atomix.protocols.raft.partition.RaftPartitionGroup +import io.atomix.utils.net.Address +import org.jsoup.nodes.TextNode +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile + +object AtomixLibUtils { + + fun configAtomix(filePath: String): Atomix { + val configFile = normalizedFile(filePath) + return Atomix.builder(configFile.absolutePath).build() + } + + fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix { + + val nodeId = clusterInfo.nodeId + + val raftPartitionGroup = RaftPartitionGroup.builder("system") + .withNumPartitions(7) + .withMembers(clusterInfo.clusterMembers) + .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId")) + .build() + + val primaryBackupGroup = + PrimaryBackupPartitionGroup.builder("data") + .withNumPartitions(31) + .build() + + return Atomix.builder() + .withMemberId(nodeId) + .withAddress(Address.from(clusterInfo.nodeAddress)) + .withManagementGroup(raftPartitionGroup) + .withPartitionGroups(primaryBackupGroup) + .withMulticastEnabled() + .build() + } + + fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> { + check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" } + + val protocol = MultiPrimaryProtocol.builder() + .withBackups(2) + .build() + + return atomix.mapBuilder<String, T>(storeName) + .withProtocol(protocol) + .withCacheEnabled() + .withValueType(JsonNode::class.java) + .withExtraTypes( + JsonNode::class.java, TextNode::class.java, ObjectNode::class.java, + ArrayNode::class.java, NullNode::class.java, MissingNode::class.java + ) + .build() + } +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt new file mode 100644 index 000000000..919d6712b --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix + +import com.fasterxml.jackson.databind.JsonNode +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive +import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import kotlin.test.assertNotNull + +class AtomixBluePrintClusterServiceTest { + val log = logger(AtomixBluePrintClusterServiceTest::class) + + @Before + fun init() { + runBlocking { + deleteNBDir("target/cluster") + } + } + + /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/ + @Test + fun testClusterJoin() { + runBlocking { + val members = arrayListOf("node-5679", "node-5680") + val deferred = arrayListOf(5679, 5680).map { port -> + async(Dispatchers.IO) { + val nodeId = "node-$port" + log.info("********** Starting node($nodeId) on port($port)") + val clusterInfo = ClusterInfo( + id = "test-cluster", nodeId = nodeId, + clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster" + ) + val atomixClusterService = AtomixBluePrintClusterService() + atomixClusterService.startCluster(clusterInfo) + atomixClusterService.atomix + } + } + val atomix = deferred.awaitAll() + /** Test Distributed store creation */ + repeat(2) { storeId -> + val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId") + assertNotNull(store, "failed to get store") + val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId") + store1.addListener { + log.info("Received map event : $it") + } + repeat(10) { + store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() + } + delay(100) + store.close() + } + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml new file mode 100644 index 000000000..f1c625e8f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml @@ -0,0 +1,36 @@ +<!-- + ~ Copyright © 2017-2018 AT&T Intellectual Property. + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are assigned the type + ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern> + </encoder> + </appender> + + <logger name="org.springframework.test" level="warn"/> + <logger name="org.springframework" level="warn"/> + <logger name="org.hibernate" level="info"/> + <logger name="io.atomix" level="warn"/> + <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/> + + <root level="warn"> + <appender-ref ref="STDOUT"/> + </root> + +</configuration> |