summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml64
-rwxr-xr-xms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml36
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt14
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt9
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt16
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt35
6 files changed, 144 insertions, 30 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
index a37089f10..020038c26 100644
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
@@ -9,7 +9,9 @@ services:
ports:
- "3306:3306"
volumes:
- - ~/vm_mysql:/var/lib/mysql
+ - target: /var/lib/mysql
+ type: volume
+ source: mysql-data
restart: always
environment:
MYSQL_ROOT_PASSWORD: sdnctl
@@ -45,8 +47,8 @@ services:
type: volume
source: blueprints-deploy
- target: /opt/app/onap/config
- type: bind
- source: ./config
+ type: volume
+ source: controller-config
environment:
# Same as hostname and container name
CLUSTER_ENABLED: "true"
@@ -79,8 +81,8 @@ services:
type: volume
source: blueprints-deploy
- target: /opt/app/onap/config
- type: bind
- source: ./config
+ type: volume
+ source: resource-resolution-config
environment:
CLUSTER_ENABLED: "true"
CLUSTER_ID: cds-cluster
@@ -94,8 +96,60 @@ services:
APP_CONFIG_HOME: /opt/app/onap/config
STICKYSELECTORKEY:
ENVCONTEXT: dev
+ py-executor-0:
+ depends_on:
+ - db
+ - nats
+ image: onap/ccsdk-py-executor
+ container_name: py-executor-0
+ hostname: py-executor-0
+ networks:
+ - cds-network
+ ports:
+ - "50052:50052"
+ restart: always
+ volumes:
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
+ environment:
+ CLUSTER_ID: cds-cluster
+ CLUSTER_NODE_ID: py-executor-0
+ CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0,py-executor-0
+ NATS_HOSTS: nats://nats:4222
+ APPLICATIONNAME: py-executor
+ BUNDLEVERSION: 1.0.0
+ APP_CONFIG_HOME: /opt/app/onap/config
+ STICKYSELECTORKEY:
+ ENVCONTEXT: dev
+ APP_PORT: 50052
+ AUTH_TYPE: tls-auth
+ LOG_FILE: /opt/app/onap/logs/application.log
volumes:
+ mysql-data:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/mysql/data
+ o: bind
blueprints-deploy:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/blueprints/deploy
+ o: bind
+ controller-config:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/cds-controller/config
+ o: bind
+ resource-resolution-config:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/resource-resolution/config
+ o: bind
networks:
cds-network:
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
index d87770286..20b17bc90 100755
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
@@ -9,7 +9,9 @@ services:
ports:
- "3306:3306"
volumes:
- - ~/vm_mysql:/var/lib/mysql
+ - target: /var/lib/mysql
+ type: volume
+ source: mysql-data
restart: always
environment:
MYSQL_ROOT_PASSWORD: sdnctl
@@ -29,7 +31,12 @@ services:
- "9111:9111"
restart: always
volumes:
- - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
+ - target: /opt/app/onap/config
+ type: volume
+ source: controller-config
environment:
APPLICATIONNAME: cds-controller
BUNDLEVERSION: 1.0.0
@@ -47,7 +54,9 @@ services:
- "50051:50051"
restart: always
volumes:
- - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
py-executor-default:
depends_on:
- db
@@ -60,7 +69,9 @@ services:
- "50052:50052"
restart: always
volumes:
- - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
environment:
APPLICATIONNAME: py-executor
BUNDLEVERSION: 1.0.0
@@ -72,7 +83,24 @@ services:
LOG_FILE: /opt/app/onap/logs/application.log
volumes:
+ mysql-data:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/mysql/data
+ o: bind
blueprints-deploy:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/blueprints/deploy
+ o: bind
+ controller-config:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/cds-controller/config
+ o: bind
networks:
cds-network:
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
index 39d081455..bef7a7b61 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
@@ -36,14 +36,18 @@ fun AbstractComponentFunction.messagePrioritizationStateService() =
/**
* MessagePrioritization correlation extensions
*/
+
+/**
+ * Arrange comma separated correlation keys in ascending order.
+ */
fun MessagePrioritization.toFormatedCorrelation(): String {
- val ascendingKey = this.correlationId!!.split(",")
+ return this.correlationId!!.split(",")
.map { it.trim() }.sorted().joinToString(",")
- return ascendingKey
}
+/**
+ * Used to group the correlation with respect to types.
+ */
fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
- val ascendingKey = this.correlationId!!.split(",")
- .map { it.trim() }.sorted().joinToString(",")
- return TypeCorrelationKey(this.type, ascendingKey)
+ return TypeCorrelationKey(this.type, this.toFormatedCorrelation())
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index d1f38f4f2..49230b6e4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -22,6 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
@@ -32,8 +33,12 @@ object MessageProcessorUtils {
clusterService: BluePrintClusterService?,
messagePrioritization: MessagePrioritization
): ClusterLock? {
- return if (clusterService != null && clusterService.clusterJoined()) {
- val lockName = "prioritization-${messagePrioritization.group}"
+ return if (clusterService != null && clusterService.clusterJoined() &&
+ !messagePrioritization.correlationId.isNullOrBlank()
+ ) {
+ // Get the correlation key in ascending order, even it it is misplaced
+ val correlationId = messagePrioritization.toFormatedCorrelation()
+ val lockName = "prioritization-${messagePrioritization.group}-$correlationId"
val clusterLock = clusterService.clusterLock(lockName)
clusterLock.lock()
if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
index a2a0d3902..9be15f2e3 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.atomix.core.Atomix
+import io.atomix.core.lock.AtomicLock
import io.atomix.core.lock.DistributedLock
import io.atomix.core.map.DistributedMap
import io.atomix.protocols.backup.MultiPrimaryProtocol
@@ -93,10 +94,21 @@ object AtomixLibUtils {
val protocol = MultiPrimaryProtocol.builder()
.withBackups(numBackups)
.build()
+ return atomix.lockBuilder(lockName)
+ .withProtocol(protocol)
+ .build()
+ }
+
+ /** get Atomic distributed lock, to get lock fence information */
+ fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock {
+ check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" }
+
+ val protocol = MultiPrimaryProtocol.builder()
+ .withBackups(numBackups)
+ .build()
- val lock = atomix.lockBuilder(lockName)
+ return atomix.atomicLockBuilder(lockName)
.withProtocol(protocol)
.build()
- return lock
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
index 39453fc7a..67bf4cabb 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -35,7 +35,7 @@ import kotlin.test.assertNotNull
import kotlin.test.assertTrue
class AtomixBluePrintClusterServiceTest {
- val log = logger(AtomixBluePrintClusterServiceTest::class)
+ private val log = logger(AtomixBluePrintClusterServiceTest::class)
@Before
fun init() {
@@ -48,9 +48,11 @@ class AtomixBluePrintClusterServiceTest {
@Test
fun testClusterJoin() {
runBlocking {
- val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680))
- // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682))
- val bluePrintClusterService = bluePrintClusterServiceOne.get(0)
+ val bluePrintClusterServiceOne =
+ createCluster(arrayListOf(5679, 5680)).toMutableList()
+ // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680))
+ // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
+ val bluePrintClusterService = bluePrintClusterServiceOne[0]
log.info("Members : ${bluePrintClusterService.allMembers()}")
log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
@@ -59,16 +61,25 @@ class AtomixBluePrintClusterServiceTest {
}
}
- private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> {
+ private suspend fun createCluster(
+ ports: List<Int>,
+ otherClusterPorts: List<Int>? = null
+ ): List<BluePrintClusterService> {
+
return withContext(Dispatchers.Default) {
- val members = ports.map { "node-$it" }
+ val clusterMembers = ports.map { "node-$it" }.toMutableList()
+ /** Add the other cluster as members */
+ if (!otherClusterPorts.isNullOrEmpty()) {
+ val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList()
+ clusterMembers.addAll(otherClusterMembers)
+ }
val deferred = ports.map { port ->
async(Dispatchers.IO) {
val nodeId = "node-$port"
log.info("********** Starting node($nodeId) on port($port)")
val clusterInfo = ClusterInfo(
id = "test-cluster", nodeId = nodeId,
- clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
+ clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster"
)
val atomixClusterService = AtomixBluePrintClusterService()
atomixClusterService.startCluster(clusterInfo)
@@ -82,11 +93,11 @@ class AtomixBluePrintClusterServiceTest {
private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
/** Test Distributed store creation */
repeat(2) { storeId ->
- val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
"blueprint-runtime-$storeId"
).toDistributedMap()
assertNotNull(store, "failed to get store")
- val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
"blueprint-runtime-$storeId"
).toDistributedMap()
@@ -105,13 +116,13 @@ class AtomixBluePrintClusterServiceTest {
val lockName = "sample-lock"
withContext(Dispatchers.IO) {
val deferred = async {
- executeLock(bluePrintClusterServices.get(0), "first", lockName)
+ executeLock(bluePrintClusterServices[0], "first", lockName)
}
val deferred2 = async {
- executeLock(bluePrintClusterServices.get(0), "second", lockName)
+ executeLock(bluePrintClusterServices[0], "second", lockName)
}
val deferred3 = async {
- executeLock(bluePrintClusterServices.get(0), "third", lockName)
+ executeLock(bluePrintClusterServices[1], "third", lockName)
}
deferred.start()
deferred2.start()