From 383235b495c32a1762511f1837bc9e98af6226eb Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Mon, 16 Dec 2019 20:59:41 -0500 Subject: Cluster distributed data store Add experimental cluster co-ordination service using Atomic framework. Included distributed data store creation utilities. Sample docker compose data cluster between cds controller and resource-resolution instances. Issue-ID: CCSDK-2000 Signed-off-by: Brinda Santh Change-Id: I4de00e773a996e08fd1d260fc27ed18832433883 --- ms/blueprintsprocessor/application/pom.xml | 6 ++ .../src/main/dc/docker-compose-cluster.yaml | 85 +++++++++++++++ .../application/src/main/dc/docker-compose.yaml | 14 ++- .../application/src/main/docker/startService.sh | 8 +- .../BluePrintProcessorCluster.kt | 103 ++++++++++++++++++ .../main/resources/atomix/atomix-bootstrap.conf | 35 ++++++ .../main/resources/atomix/atomix-multicast.conf | 40 +++++++ .../application/src/main/resources/logback.xml | 1 + .../core/BluePrintConstants.kt | 8 ++ .../core/utils/ClusterUtils.kt | 28 +++++ .../modules/commons/atomix-lib/pom.xml | 56 ++++++++++ .../atomix/BluePrintAtomixLibConfiguration.kt | 31 ++++++ .../atomix/BluePrintAtomixLibExtensions.kt | 25 +++++ .../service/AtomixBluePrintClusterService.kt | 118 +++++++++++++++++++++ .../atomix/utils/AtomixLibUtils.kt | 82 ++++++++++++++ .../atomix/AtomixBluePrintClusterServiceTest.kt | 80 ++++++++++++++ .../atomix-lib/src/test/resources/logback-test.xml | 36 +++++++ ms/blueprintsprocessor/modules/commons/pom.xml | 1 + .../core/service/BluePrintClusterService.kt | 51 +++++++++ ms/blueprintsprocessor/parent/pom.xml | 28 +++++ 20 files changed, 834 insertions(+), 2 deletions(-) create mode 100644 ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml create mode 100644 ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt create mode 100644 ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-bootstrap.conf create mode 100644 ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-multicast.conf create mode 100644 ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt create mode 100644 ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt (limited to 'ms/blueprintsprocessor') diff --git a/ms/blueprintsprocessor/application/pom.xml b/ms/blueprintsprocessor/application/pom.xml index f4d784bd3..a566c8447 100755 --- a/ms/blueprintsprocessor/application/pom.xml +++ b/ms/blueprintsprocessor/application/pom.xml @@ -95,6 +95,12 @@ 0.7.0-SNAPSHOT + + + org.onap.ccsdk.cds.blueprintsprocessor + atomix-lib + + org.onap.ccsdk.cds.blueprintsprocessor.functions diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml new file mode 100644 index 000000000..f4b4b7995 --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml @@ -0,0 +1,85 @@ +version: '3.7' + +services: + db: + image: mariadb:latest + container_name: ccsdk-mariadb + networks: + - cds-network + ports: + - "3306:3306" + volumes: + - ~/vm_mysql:/var/lib/mysql + restart: always + environment: + MYSQL_ROOT_PASSWORD: sdnctl + MYSQL_DATABASE: sdnctl + MYSQL_USER: sdnctl + MYSQL_PASSWORD: sdnctl + cds-controller-1: + depends_on: + - db + image: onap/ccsdk-blueprintsprocessor:latest + container_name: cds-controller-1 + hostname: cds-controller-1 + networks: + - cds-network + ports: + - "8000:8080" + - "9111:9111" + restart: always + volumes: + - target: /opt/app/onap/blueprints/deploy + type: volume + source: blueprints-deploy + - target: /opt/app/onap/config + type: bind + source: ./config + environment: + # Same as hostname and container name + CLUSTER_ID: cds-cluster + CLUSTER_NODE_ID: cds-controller-1 + CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1 + CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster + #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf + APPLICATIONNAME: cds-controller + BUNDLEVERSION: 1.0.0 + APP_CONFIG_HOME: /opt/app/onap/config + STICKYSELECTORKEY: + ENVCONTEXT: dev + resource-resolution-1: + depends_on: + - db + image: onap/ccsdk-blueprintsprocessor:latest + container_name: resource-resolution-1 + hostname: resource-resolution-1 + networks: + - cds-network + ports: + - "8001:8080" + - "9112:9111" + restart: always + volumes: + - target: /opt/app/onap/blueprints/deploy + type: volume + source: blueprints-deploy + - target: /opt/app/onap/config + type: bind + source: ./config + environment: + CLUSTER_ID: cds-cluster + CLUSTER_NODE_ID: resource-resolution-1 + CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1 + CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster + #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf + APPLICATIONNAME: resource-resolution + BUNDLEVERSION: 1.0.0 + APP_CONFIG_HOME: /opt/app/onap/config + STICKYSELECTORKEY: + ENVCONTEXT: dev +volumes: + blueprints-deploy: + +networks: + cds-network: + driver: bridge diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml index 0ff04bf3a..d87770286 100755 --- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml +++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml @@ -1,9 +1,11 @@ -version: '3.3' +version: '3.7' services: db: image: mariadb:latest container_name: ccsdk-mariadb + networks: + - cds-network ports: - "3306:3306" volumes: @@ -20,6 +22,8 @@ services: image: onap/ccsdk-blueprintsprocessor:latest container_name: cds-controller-default hostname: cds-controller-default + networks: + - cds-network ports: - "8000:8080" - "9111:9111" @@ -37,6 +41,8 @@ services: - db image: onap/ccsdk-commandexecutor:latest container_name: bp-command-executor + networks: + - cds-network ports: - "50051:50051" restart: always @@ -48,6 +54,8 @@ services: image: onap/ccsdk-py-executor container_name: py-executor-default hostname: py-executor-default + networks: + - cds-network ports: - "50052:50052" restart: always @@ -65,3 +73,7 @@ services: volumes: blueprints-deploy: + +networks: + cds-network: + driver: bridge diff --git a/ms/blueprintsprocessor/application/src/main/docker/startService.sh b/ms/blueprintsprocessor/application/src/main/docker/startService.sh index 7dcb5ff16..f5967dcb4 100644 --- a/ms/blueprintsprocessor/application/src/main/docker/startService.sh +++ b/ms/blueprintsprocessor/application/src/main/docker/startService.sh @@ -2,7 +2,7 @@ nodeName=BlueprintsProcessor_1.0.0_$(cat /proc/self/cgroup | grep docker | sed s/\\//\\n/g | tail -1) -echo "APP Config HOME : ${APP_CONFIG_HOME}" +echo "${CLUSTER_ID}:${CLUSTER_NODE_ID} APP Config HOME : ${APP_CONFIG_HOME}" export APP_HOME=/opt/app/onap keytool -import -noprompt -trustcacerts -keystore $JAVA_HOME/jre/lib/security/cacerts -storepass changeit -alias ONAP -import -file $APP_CONFIG_HOME/ONAP_RootCA.cer @@ -18,4 +18,10 @@ exec java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sour -Djava.security.egd=file:/dev/./urandom \ -DAPPNAME=${APPLICATIONNAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \ -Dspring.config.location=${APP_CONFIG_HOME}/ \ +-DCLUSTER_ID=${CLUSTER_ID} \ +-DCLUSTER_NODE_ID=${CLUSTER_NODE_ID} \ +-DCLUSTER_NODE_ADDRESS=${CLUSTER_NODE_ID} \ +-DCLUSTER_MEMBERS=${CLUSTER_MEMBERS} \ +-DCLUSTER_STORAGE_PATH=${CLUSTER_STORAGE_PATH} \ +-DCLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} \ org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt new file mode 100644 index 000000000..b78ebf68b --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt @@ -0,0 +1,103 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor + +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.boot.context.event.ApplicationReadyEvent +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Component +import java.time.Duration +import javax.annotation.PreDestroy + +/** + * To Start the cluster, minimum 2 Instances/ Replicas od CDS needed. + * All instance such as Blueprintprocessor, ResourceResolution, MessagePrioritization should be in + * same cluster and should have same cluster name. + * + * Data can be shared only between the clusters, outside the cluster data can't be shared. + * If cds-controller-x instance wants to share data with resource-resolution-x instance, then it should be in the + * same cluster.(cds-cluster) and same network (cds-network) + * + * Assumptions: + * 1. Container, Pod and Host names are same. + * 2. Container names should end with sequence number. + * Blueprintprocessor example be : cds-controller-1, cds-controller-2, cds-controller-3 + * ResourceResolution example be : resource-resolution-1, resource-resolution-2, resource-resolution-3 + * 3. Each contained, should have environment properties CLUSTER_ID, CLUSTER_NODE_ID, CLUSTER_NODE_ADDRESS, + * CLUSTER_MEMBERS, CLUSTER_STORAGE_PATH + * Example values : + * CLUSTER_ID: cds-cluster + * CLUSTER_NODE_ID: cds-controller-2 + * CLUSTER_NODE_ADDRESS: cds-controller-2 + * CLUSTER_MEMBERS: cds-controller-1,cds-controller-2,cds-controller-3,resource-resolution-1,resource-resolution-2,resource-resolution-3 + * CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster + * CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf + * 4. Cluster will be enabled only all the above properties present in the environments. + * if CLUSTER_ID is present, then it will try to create cluster. + */ +@Component +open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePrintClusterService) { + + private val log = logger(BluePrintProcessorCluster::class) + + @EventListener(ApplicationReadyEvent::class) + fun startAndJoinCluster() = runBlocking { + val clusterId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID) + + if (!clusterId.isNullOrEmpty()) { + + val nodeId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) + ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ID}") + + val nodeAddress = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS) + ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS}") + + val clusterMembers = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS) + ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_MEMBERS}") + + val clusterMemberList = clusterMembers.split(",").map { it.trim() }.toList() + + val clusterStorage = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH) + ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH}") + + val clusterConfigFile = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE) + + val clusterInfo = ClusterInfo( + id = clusterId, nodeId = nodeId, + clusterMembers = clusterMemberList, nodeAddress = nodeAddress, + storagePath = clusterStorage, + configFile = clusterConfigFile + ) + bluePrintClusterService.startCluster(clusterInfo) + } else { + log.info( + "Cluster is disabled, to enable cluster set the environment " + + "properties[CLUSTER_ID,CLUSTER_NODE_ID, CLUSTER_NODE_ADDRESS, CLUSTER_MEMBERS,CLUSTER_CONFIG_FILE]" + ) + } + } + + @PreDestroy + fun shutDown() = runBlocking { + bluePrintClusterService.shutDown(Duration.ofSeconds(1)) + } +} diff --git a/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-bootstrap.conf b/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-bootstrap.conf new file mode 100644 index 000000000..0fc31e00f --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-bootstrap.conf @@ -0,0 +1,35 @@ +cluster { + # Configure the cluster node information. + node { + id: ${CLUSTER_NODE_ID} + address: ${CLUSTER_NODE_ADDRESS} + } + # Configure the node discovery protocol. + discovery { + type: bootstrap + nodes.1 { + id: cds-controller-1 + address: "cds-controller-1:5679" + } + nodes.2 { + id: resource-reolution-1 + address: "resource-reolution-1:5679" + } + } +} +# Configure the system management group. +managementGroup { + type: raft + name: system + partitions: 1 + members: [${CLUSTER_MEMBERS}] + storage { + directory: ${CLUSTER_STORAGE_PATH}/data-${CLUSTER_NODE_ID} + level: DISK + } +} +# Configure a Raft partition group. +partitionGroups.data { + type: primary-backup + partitions: 7 +} diff --git a/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-multicast.conf b/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-multicast.conf new file mode 100644 index 000000000..fd161879c --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-multicast.conf @@ -0,0 +1,40 @@ +cluster { + # Configure the cluster node information. + node { + id: ${CLUSTER_NODE_ID} + address: ${CLUSTER_NODE_ADDRESS} + } + # Configure the node discovery protocol. + discovery { + type: multicast + } + multicast: { + enabled: true + port: 54321 + } + # Configure the SWIM membership protocol. + protocol { + type: swim + broadcastUpdates: true + gossipInterval: 500ms + probeInterval: 2s + suspectProbes: 2 + } +} +# Configure the system management group. +managementGroup { + type: raft + name: system + partitions: 1 + members: [${CLUSTER_MEMBERS}] + storage { + directory: ${CLUSTER_STORAGE_PATH}/data-${CLUSTER_NODE_ID} + level: DISK + } +} + +# Configure a Raft partition group. +partitionGroups.data { + type: primary-backup + partitions: 7 +} diff --git a/ms/blueprintsprocessor/application/src/main/resources/logback.xml b/ms/blueprintsprocessor/application/src/main/resources/logback.xml index e1389a66f..d58be8ac7 100644 --- a/ms/blueprintsprocessor/application/src/main/resources/logback.xml +++ b/ms/blueprintsprocessor/application/src/main/resources/logback.xml @@ -39,6 +39,7 @@ + diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index fcc921cd9..caf063161 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -217,4 +217,12 @@ object BluePrintConstants { const val MODEL_TYPE_ARTIFACT_COMPONENT_JAR = "artifact-component-jar" val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean() + + /** Cluster Properties */ + const val PROPERTY_CLUSTER_ID = "CLUSTER_ID" + const val PROPERTY_CLUSTER_NODE_ID = "CLUSTER_NODE_ID" + const val PROPERTY_CLUSTER_NODE_ADDRESS = "CLUSTER_NODE_ADDRESS" + const val PROPERTY_CLUSTER_MEMBERS = "CLUSTER_MEMBERS" + const val PROPERTY_CLUSTER_STORAGE_PATH = "CLUSTER_STORAGE_PATH" + const val PROPERTY_CLUSTER_CONFIG_FILE = "CLUSTER_CONFIG_FILE" } diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt new file mode 100644 index 000000000..d3d621093 --- /dev/null +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt @@ -0,0 +1,28 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.core.utils + +import java.net.InetAddress + +object ClusterUtils { + + /** get the local host name */ + fun hostname(): String { + val ip = InetAddress.getLocalHost() + return ip.hostName + } +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml new file mode 100644 index 000000000..7fa7b452a --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml @@ -0,0 +1,56 @@ + + + + + 4.0.0 + + + org.onap.ccsdk.cds.blueprintsprocessor + commons + 0.7.0-SNAPSHOT + + + atomix-lib + jar + + Blueprints Processor Atomix Lib + Blueprints Processor Atomix Lib + + + + io.atomix + atomix + + + io.atomix + atomix-raft + + + io.atomix + atomix-primary-backup + + + io.atomix + atomix-gossip + + + + org.onap.ccsdk.cds.blueprintsprocessor + db-lib + + + diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt new file mode 100644 index 000000000..8ea15935f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt @@ -0,0 +1,31 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix + +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import org.springframework.context.annotation.Configuration + +@Configuration +open class BluePrintAtomixLibConfiguration + +/** + * Exposed Dependency Service by this Atomix Lib Module + */ +fun BluePrintDependencyService.clusterService(): BluePrintClusterService = + instance(AtomixBluePrintClusterService::class) diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt new file mode 100644 index 000000000..696d728dd --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt @@ -0,0 +1,25 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix + +import io.atomix.core.map.DistributedMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +fun > T.toDistributedMap(): DistributedMap<*, *> { + return if (this != null && this is DistributedMap<*, *>) this + else throw BluePrintProcessorException("map is not of type DistributedMap") +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt new file mode 100644 index 000000000..27921be9d --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt @@ -0,0 +1,118 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service + +import io.atomix.cluster.ClusterMembershipEvent +import io.atomix.core.Atomix +import kotlinx.coroutines.delay +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.stereotype.Service +import java.time.Duration +import java.util.concurrent.CompletableFuture + +@Service +open class AtomixBluePrintClusterService : BluePrintClusterService { + + private val log = logger(AtomixBluePrintClusterService::class) + + lateinit var atomix: Atomix + + private var joined = false + + override suspend fun startCluster(clusterInfo: ClusterInfo) { + log.info( + "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + + "starting with members(${clusterInfo.clusterMembers})" + ) + + /** Create Atomix cluster either from config file or default multi-cast cluster*/ + atomix = if (!clusterInfo.configFile.isNullOrEmpty()) { + AtomixLibUtils.configAtomix(clusterInfo.configFile!!) + } else { + AtomixLibUtils.defaultMulticastAtomix(clusterInfo) + } + + /** Listen for the member chaneg events */ + atomix.membershipService.addListener { membershipEvent -> + when (membershipEvent.type()) { + ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added") + ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed") + ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed") + else -> log.info("***** Member event unknown") + } + } + atomix.start().join() + log.info( + "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + + "created successfully...." + ) + + /** Receive ping from network */ + val pingHandler = { message: String -> + log.info("####### ping message received : $message") + CompletableFuture.completedFuture(message) + } + atomix.communicationService.subscribe("ping", pingHandler) + + /** Ping the network */ + atomix.communicationService.broadcast( + "ping", + "ping from node(${clusterInfo.nodeId})" + ) + joined = true + } + + override fun clusterJoined(): Boolean { + return joined + } + + override suspend fun allMembers(): Set { + check(::atomix.isInitialized) { "failed to start and join cluster" } + check(atomix.isRunning) { "cluster is not running" } + return atomix.membershipService.members.map { + ClusterMember( + id = it.id().id(), + memberAddress = it.host() + ) + }.toSet() + } + + override suspend fun clusterMembersForPrefix(memberPrefix: String): Set { + check(::atomix.isInitialized) { "failed to start and join cluster" } + check(atomix.isRunning) { "cluster is not running" } + + return atomix.membershipService.members.filter { + it.id().id().startsWith(memberPrefix, true) + }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) } + .toSet() + } + + override suspend fun clusterMapStore(name: String): MutableMap { + return AtomixLibUtils.distributedMapStore(atomix, name) + } + + override suspend fun shutDown(duration: Duration) { + val shutDownMilli = duration.toMillis() + log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") + delay(shutDownMilli) + atomix.stop() + } +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt new file mode 100644 index 000000000..6e726a1a6 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt @@ -0,0 +1,82 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ArrayNode +import com.fasterxml.jackson.databind.node.MissingNode +import com.fasterxml.jackson.databind.node.NullNode +import com.fasterxml.jackson.databind.node.ObjectNode +import io.atomix.core.Atomix +import io.atomix.core.map.DistributedMap +import io.atomix.protocols.backup.MultiPrimaryProtocol +import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup +import io.atomix.protocols.raft.partition.RaftPartitionGroup +import io.atomix.utils.net.Address +import org.jsoup.nodes.TextNode +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile + +object AtomixLibUtils { + + fun configAtomix(filePath: String): Atomix { + val configFile = normalizedFile(filePath) + return Atomix.builder(configFile.absolutePath).build() + } + + fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix { + + val nodeId = clusterInfo.nodeId + + val raftPartitionGroup = RaftPartitionGroup.builder("system") + .withNumPartitions(7) + .withMembers(clusterInfo.clusterMembers) + .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId")) + .build() + + val primaryBackupGroup = + PrimaryBackupPartitionGroup.builder("data") + .withNumPartitions(31) + .build() + + return Atomix.builder() + .withMemberId(nodeId) + .withAddress(Address.from(clusterInfo.nodeAddress)) + .withManagementGroup(raftPartitionGroup) + .withPartitionGroups(primaryBackupGroup) + .withMulticastEnabled() + .build() + } + + fun distributedMapStore(atomix: Atomix, storeName: String): DistributedMap { + check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" } + + val protocol = MultiPrimaryProtocol.builder() + .withBackups(2) + .build() + + return atomix.mapBuilder(storeName) + .withProtocol(protocol) + .withCacheEnabled() + .withValueType(JsonNode::class.java) + .withExtraTypes( + JsonNode::class.java, TextNode::class.java, ObjectNode::class.java, + ArrayNode::class.java, NullNode::class.java, MissingNode::class.java + ) + .build() + } +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt new file mode 100644 index 000000000..919d6712b --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.atomix + +import com.fasterxml.jackson.databind.JsonNode +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive +import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import kotlin.test.assertNotNull + +class AtomixBluePrintClusterServiceTest { + val log = logger(AtomixBluePrintClusterServiceTest::class) + + @Before + fun init() { + runBlocking { + deleteNBDir("target/cluster") + } + } + + /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/ + @Test + fun testClusterJoin() { + runBlocking { + val members = arrayListOf("node-5679", "node-5680") + val deferred = arrayListOf(5679, 5680).map { port -> + async(Dispatchers.IO) { + val nodeId = "node-$port" + log.info("********** Starting node($nodeId) on port($port)") + val clusterInfo = ClusterInfo( + id = "test-cluster", nodeId = nodeId, + clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster" + ) + val atomixClusterService = AtomixBluePrintClusterService() + atomixClusterService.startCluster(clusterInfo) + atomixClusterService.atomix + } + } + val atomix = deferred.awaitAll() + /** Test Distributed store creation */ + repeat(2) { storeId -> + val store = AtomixLibUtils.distributedMapStore(atomix.get(0), "blueprint-runtime-$storeId") + assertNotNull(store, "failed to get store") + val store1 = AtomixLibUtils.distributedMapStore(atomix.get(1), "blueprint-runtime-$storeId") + store1.addListener { + log.info("Received map event : $it") + } + repeat(10) { + store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() + } + delay(100) + store.close() + } + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml new file mode 100644 index 000000000..f1c625e8f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml @@ -0,0 +1,36 @@ + + + + + + + %d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n + + + + + + + + + + + + + + diff --git a/ms/blueprintsprocessor/modules/commons/pom.xml b/ms/blueprintsprocessor/modules/commons/pom.xml index 30c34ab52..78c569125 100755 --- a/ms/blueprintsprocessor/modules/commons/pom.xml +++ b/ms/blueprintsprocessor/modules/commons/pom.xml @@ -34,6 +34,7 @@ processor-core + atomix-lib db-lib rest-lib dmaap-lib diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt new file mode 100644 index 000000000..bbaa427c9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -0,0 +1,51 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.service + +import java.time.Duration + +interface BluePrintClusterService { + + /** Start the cluster with [clusterInfo], By default clustering service is disabled. + * Application module has to start cluster */ + suspend fun startCluster(clusterInfo: ClusterInfo) + + fun clusterJoined(): Boolean + + /** Returns all the data cluster members */ + suspend fun allMembers(): Set + + /** Returns data cluster members starting with prefix */ + suspend fun clusterMembersForPrefix(memberPrefix: String): Set + + /** Create and get or get the distributed data map store with [name] */ + suspend fun clusterMapStore(name: String): MutableMap + + /** Shut down the cluster with [duration] */ + suspend fun shutDown(duration: Duration) +} + +data class ClusterInfo( + val id: String, + var configFile: String? = null, + val nodeId: String, + val nodeAddress: String, + var clusterMembers: List, + var storagePath: String +) + +data class ClusterMember(val id: String, val memberAddress: String?) diff --git a/ms/blueprintsprocessor/parent/pom.xml b/ms/blueprintsprocessor/parent/pom.xml index b80639725..091d9dc68 100755 --- a/ms/blueprintsprocessor/parent/pom.xml +++ b/ms/blueprintsprocessor/parent/pom.xml @@ -25,6 +25,7 @@ org.onap.ccsdk.cds.blueprintsprocessor parent + 0.7.0-SNAPSHOT pom Blueprints Processor Parent @@ -296,6 +297,28 @@ ${netty-ssl} + + + io.atomix + atomix + ${atomix.version} + + + io.atomix + atomix-raft + ${atomix.version} + + + io.atomix + atomix-primary-backup + ${atomix.version} + + + io.atomix + atomix-gossip + ${atomix.version} + + org.apache.sshd @@ -376,6 +399,11 @@ processor-core ${project.version} + + org.onap.ccsdk.cds.blueprintsprocessor + atomix-lib + ${project.version} + org.onap.ccsdk.cds.blueprintsprocessor db-lib -- cgit 1.2.3-korg