aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/atomix-lib/src
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2019-12-16 20:59:41 -0500
committerBrinda Santh <bs2796@att.com>2019-12-16 20:59:41 -0500
commit383235b495c32a1762511f1837bc9e98af6226eb (patch)
treeafe0e093f4756b2f82679038a3029c515082c9e2 /ms/blueprintsprocessor/modules/commons/atomix-lib/src
parent73a37ecd64accefc0e4b8a9db2cb9e0127d94408 (diff)
Cluster distributed data store
Add experimental cluster co-ordination service using Atomic framework. Included distributed data store creation utilities. Sample docker compose data cluster between cds controller and resource-resolution instances. Issue-ID: CCSDK-2000 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: I4de00e773a996e08fd1d260fc27ed18832433883
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/atomix-lib/src')
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt31
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt25
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt118
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt82
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt80
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml36
6 files changed, 372 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt
new file mode 100644
index 000000000..8ea15935f
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import org.springframework.context.annotation.Configuration
+
+@Configuration
+open class BluePrintAtomixLibConfiguration
+
+/**
+ * Exposed Dependency Service by this Atomix Lib Module
+ */
+fun BluePrintDependencyService.clusterService(): BluePrintClusterService =
+ instance(AtomixBluePrintClusterService::class)
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
new file mode 100644
index 000000000..696d728dd
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
@@ -0,0 +1,25 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+
+import io.atomix.core.map.DistributedMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+fun <T : Map<*, *>> T.toDistributedMap(): DistributedMap<*, *> {
+ return if (this != null && this is DistributedMap<*, *>) this
+ else throw BluePrintProcessorException("map is not of type DistributedMap")
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
new file mode 100644
index 000000000..27921be9d
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
@@ -0,0 +1,118 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
+
+import io.atomix.cluster.ClusterMembershipEvent
+import io.atomix.core.Atomix
+import kotlinx.coroutines.delay
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.stereotype.Service
+import java.time.Duration
+import java.util.concurrent.CompletableFuture
+
+@Service
+open class AtomixBluePrintClusterService : BluePrintClusterService {
+
+ private val log = logger(AtomixBluePrintClusterService::class)
+
+ lateinit var atomix: Atomix
+
+ private var joined = false
+
+ override suspend fun startCluster(clusterInfo: ClusterInfo) {
+ log.info(
+ "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
+ "starting with members(${clusterInfo.clusterMembers})"
+ )
+
+ /** Create Atomix cluster either from config file or default multi-cast cluster*/
+ atomix = if (!clusterInfo.configFile.isNullOrEmpty()) {
+ AtomixLibUtils.configAtomix(clusterInfo.configFile!!)
+ } else {
+ AtomixLibUtils.defaultMulticastAtomix(clusterInfo)
+ }
+
+ /** Listen for the member chaneg events */
+ atomix.membershipService.addListener { membershipEvent ->
+ when (membershipEvent.type()) {
+ ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added")
+ ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed")
+ ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed")
+ else -> log.info("***** Member event unknown")
+ }
+ }
+ atomix.start().join()
+ log.info(
+ "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
+ "created successfully...."
+ )
+
+ /** Receive ping from network */
+ val pingHandler = { message: String ->
+ log.info("####### ping message received : $message")
+ CompletableFuture.completedFuture(message)
+ }
+ atomix.communicationService.subscribe("ping", pingHandler)
+
+ /** Ping the network */
+ atomix.communicationService.broadcast(
+ "ping",
+ "ping from node(${clusterInfo.nodeId})"
+ )
+ joined = true
+ }
+
+ override fun clusterJoined(): Boolean {
+ return joined
+ }
+
+ override suspend fun allMembers(): Set<ClusterMember> {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
+ check(atomix.isRunning) { "cluster is not running" }
+ return atomix.membershipService.members.map {
+ ClusterMember(
+ id = it.id().id(),
+ memberAddress = it.host()
+ )
+ }.toSet()
+ }
+
+ override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
+ check(atomix.isRunning) { "cluster is not running" }
+
+ return atomix.membershipService.members.filter {
+ it.id().id().startsWith(memberPrefix, true)
+ }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) }
+ .toSet()
+ }
+
+ override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
+ return AtomixLibUtils.distributedMapStore<T>(atomix, name)
+ }
+
+ override suspend fun shutDown(duration: Duration) {
+ val shutDownMilli = duration.toMillis()
+ log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
+ delay(shutDownMilli)
+ atomix.stop()
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
new file mode 100644
index 000000000..6e726a1a6
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ArrayNode
+import com.fasterxml.jackson.databind.node.MissingNode
+import com.fasterxml.jackson.databind.node.NullNode
+import com.fasterxml.jackson.databind.node.ObjectNode
+import io.atomix.core.Atomix
+import io.atomix.core.map.DistributedMap
+import io.atomix.protocols.backup.MultiPrimaryProtocol
+import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
+import io.atomix.protocols.raft.partition.RaftPartitionGroup
+import io.atomix.utils.net.Address
+import org.jsoup.nodes.TextNode
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
+
+object AtomixLibUtils {
+
+ fun configAtomix(filePath: String): Atomix {
+ val configFile = normalizedFile(filePath)
+ return Atomix.builder(configFile.absolutePath).build()
+ }
+
+ fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix {
+
+ val nodeId = clusterInfo.nodeId
+
+ val raftPartitionGroup = RaftPartitionGroup.builder("system")
+ .withNumPartitions(7)
+ .withMembers(clusterInfo.clusterMembers)
+ .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
+ .build()
+
+ val primaryBackupGroup =
+ PrimaryBackupPartitionGroup.builder("data")
+ .withNumPartitions(31)
+ .build()
+
+ return Atomix.builder()
+ .withMemberId(nodeId)
+ .withAddress(Address.from(clusterInfo.nodeAddress))
+ .withManagementGroup(raftPartitionGroup)
+ .withPartitionGroups(primaryBackupGroup)
+ .withMulticastEnabled()
+ .build()
+ }
+
+ fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> {
+ check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
+
+ val protocol = MultiPrimaryProtocol.builder()
+ .withBackups(2)
+ .build()
+
+ return atomix.mapBuilder<String, T>(storeName)
+ .withProtocol(protocol)
+ .withCacheEnabled()
+ .withValueType(JsonNode::class.java)
+ .withExtraTypes(
+ JsonNode::class.java, TextNode::class.java, ObjectNode::class.java,
+ ArrayNode::class.java, NullNode::class.java, MissingNode::class.java
+ )
+ .build()
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
new file mode 100644
index 000000000..919d6712b
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -0,0 +1,80 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+
+import com.fasterxml.jackson.databind.JsonNode
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.async
+import kotlinx.coroutines.awaitAll
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.junit.Before
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
+import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import kotlin.test.assertNotNull
+
+class AtomixBluePrintClusterServiceTest {
+ val log = logger(AtomixBluePrintClusterServiceTest::class)
+
+ @Before
+ fun init() {
+ runBlocking {
+ deleteNBDir("target/cluster")
+ }
+ }
+
+ /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/
+ @Test
+ fun testClusterJoin() {
+ runBlocking {
+ val members = arrayListOf("node-5679", "node-5680")
+ val deferred = arrayListOf(5679, 5680).map { port ->
+ async(Dispatchers.IO) {
+ val nodeId = "node-$port"
+ log.info("********** Starting node($nodeId) on port($port)")
+ val clusterInfo = ClusterInfo(
+ id = "test-cluster", nodeId = nodeId,
+ clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
+ )
+ val atomixClusterService = AtomixBluePrintClusterService()
+ atomixClusterService.startCluster(clusterInfo)
+ atomixClusterService.atomix
+ }
+ }
+ val atomix = deferred.awaitAll()
+ /** Test Distributed store creation */
+ repeat(2) { storeId ->
+ val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+ assertNotNull(store, "failed to get store")
+ val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+ store1.addListener {
+ log.info("Received map event : $it")
+ }
+ repeat(10) {
+ store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
+ }
+ delay(100)
+ store.close()
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..f1c625e8f
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
@@ -0,0 +1,36 @@
+<!--
+ ~ Copyright © 2017-2018 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.springframework.test" level="warn"/>
+ <logger name="org.springframework" level="warn"/>
+ <logger name="org.hibernate" level="info"/>
+ <logger name="io.atomix" level="warn"/>
+ <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>