aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor')
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt23
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt63
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml2
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt4
9 files changed, 81 insertions, 26 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
index 696d728dd..17d243620 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
@@ -16,10 +16,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+import com.fasterxml.jackson.databind.JsonNode
import io.atomix.core.map.DistributedMap
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
-fun <T : Map<*, *>> T.toDistributedMap(): DistributedMap<*, *> {
- return if (this != null && this is DistributedMap<*, *>) this
+fun <T : Map<String, JsonNode>> T.toDistributedMap(): DistributedMap<String, JsonNode> {
+ return if (this != null && this is DistributedMap<*, *>) this as DistributedMap<String, JsonNode>
else throw BluePrintProcessorException("map is not of type DistributedMap")
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
index b5ec6dd5c..0690eb89d 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
@@ -55,10 +55,12 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
when (membershipEvent.type()) {
ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
+ ClusterMembershipEvent.Type.REACHABILITY_CHANGED -> log.info("Reachability Changed : ${membershipEvent.subject()}")
ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
else -> log.info("Member event unknown")
}
}
+ /** Start and Join the Cluster */
atomix.start().join()
log.info(
"Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
@@ -83,6 +85,19 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
return atomix.isRunning
}
+ override suspend fun masterMember(partitionGroup: String): ClusterMember {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
+ check(atomix.isRunning) { "cluster is not running" }
+ val masterId = atomix.partitionService
+ .getPartitionGroup(partitionGroup)
+ .getPartition("1").primary()
+ val masterMember = atomix.membershipService.getMember(masterId)
+ return ClusterMember(
+ id = masterMember.id().id(),
+ memberAddress = masterMember.address().toString()
+ )
+ }
+
override suspend fun allMembers(): Set<ClusterMember> {
check(::atomix.isInitialized) { "failed to start and join cluster" }
check(atomix.isRunning) { "cluster is not running" }
@@ -90,7 +105,7 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
return atomix.membershipService.members.map {
ClusterMember(
id = it.id().id(),
- memberAddress = it.host()
+ memberAddress = it.address().toString()
)
}.toSet()
}
@@ -153,4 +168,10 @@ open class ClusterLockImpl(private val atomix: Atomix, private val name: String)
override fun isLocked(): Boolean {
return distributedLock.isLocked
}
+
+ override fun close() {
+ if (::distributedLock.isInitialized) {
+ distributedLock.close()
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
index b25706972..39453fc7a 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -17,7 +17,6 @@
package org.onap.ccsdk.cds.blueprintsprocessor.atomix
import com.fasterxml.jackson.databind.JsonNode
-import io.atomix.core.Atomix
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
@@ -27,7 +26,7 @@ import kotlinx.coroutines.withContext
import org.junit.Before
import org.junit.Test
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
@@ -45,12 +44,25 @@ class AtomixBluePrintClusterServiceTest {
}
}
- /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/
+ /** Testing two cluster with distributed map store creation, This is time consuming test case, taks around 10s **/
@Test
fun testClusterJoin() {
runBlocking {
- val members = arrayListOf("node-5679", "node-5680")
- val deferred = arrayListOf(5679, 5680).map { port ->
+ val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680))
+ // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682))
+ val bluePrintClusterService = bluePrintClusterServiceOne.get(0)
+ log.info("Members : ${bluePrintClusterService.allMembers()}")
+ log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
+ log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
+ testDistributedStore(bluePrintClusterServiceOne)
+ testDistributedLock(bluePrintClusterServiceOne)
+ }
+ }
+
+ private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> {
+ return withContext(Dispatchers.Default) {
+ val members = ports.map { "node-$it" }
+ val deferred = ports.map { port ->
async(Dispatchers.IO) {
val nodeId = "node-$port"
log.info("********** Starting node($nodeId) on port($port)")
@@ -60,43 +72,46 @@ class AtomixBluePrintClusterServiceTest {
)
val atomixClusterService = AtomixBluePrintClusterService()
atomixClusterService.startCluster(clusterInfo)
- atomixClusterService.atomix
+ atomixClusterService
}
}
- val atomixs = deferred.awaitAll()
- testDistributedStore(atomixs)
- testDistributedLock(atomixs)
+ deferred.awaitAll()
}
}
- private suspend fun testDistributedStore(atomix: List<Atomix>) {
+ private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
/** Test Distributed store creation */
repeat(2) { storeId ->
- val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+ val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ "blueprint-runtime-$storeId"
+ ).toDistributedMap()
assertNotNull(store, "failed to get store")
- val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+ val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ "blueprint-runtime-$storeId"
+ ).toDistributedMap()
+
store1.addListener {
log.info("Received map event : $it")
}
- repeat(10) {
+ repeat(5) {
store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
}
- delay(100)
+ delay(10)
store.close()
}
}
- private suspend fun testDistributedLock(atomix: List<Atomix>) {
+ private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
val lockName = "sample-lock"
withContext(Dispatchers.IO) {
val deferred = async {
- executeLock(atomix.get(0), "first", lockName)
+ executeLock(bluePrintClusterServices.get(0), "first", lockName)
}
val deferred2 = async {
- executeLock(atomix.get(1), "second", lockName)
+ executeLock(bluePrintClusterServices.get(0), "second", lockName)
}
val deferred3 = async {
- executeLock(atomix.get(1), "third", lockName)
+ executeLock(bluePrintClusterServices.get(0), "third", lockName)
}
deferred.start()
deferred2.start()
@@ -104,17 +119,21 @@ class AtomixBluePrintClusterServiceTest {
}
}
- private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) {
+ private suspend fun executeLock(
+ bluePrintClusterService: BluePrintClusterService,
+ lockId: String,
+ lockName: String
+ ) {
log.info("initialising $lockId lock...")
- val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName)
+ val distributedLock = bluePrintClusterService.clusterLock(lockName)
assertNotNull(distributedLock, "failed to create distributed $lockId lock")
distributedLock.lock()
- assertTrue(distributedLock.isLocked, "failed to lock $lockId")
+ assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
try {
log.info("locked $lockId process for 5mSec")
delay(5)
} finally {
- distributedLock.unlock()
+ distributedLock.unLock()
log.info("$lockId lock released")
}
distributedLock.close()
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
index f1c625e8f..016d48636 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
@@ -19,7 +19,7 @@
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{100} - %msg%n</pattern>
</encoder>
</appender>
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
index 709ee7d6e..147d360ba 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.nats
import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
@@ -31,9 +32,14 @@ open class BluePrintNatsLibConfiguration
fun BluePrintDependencyService.natsLibPropertyService(): BluePrintNatsLibPropertyService =
instance(NatsLibConstants.SERVICE_BLUEPRINT_NATS_LIB_PROPERTY)
+fun BluePrintDependencyService.controllerNatsService(): BluePrintNatsService {
+ return natsLibPropertyService().bluePrintNatsService(NatsLibConstants.DEFULT_NATS_SELECTOR)
+}
+
class NatsLibConstants {
companion object {
const val SERVICE_BLUEPRINT_NATS_LIB_PROPERTY = "blueprint-nats-lib-property-service"
+ const val DEFULT_NATS_SELECTOR = "cds-controller"
const val PROPERTY_NATS_PREFIX = "blueprintsprocessor.nats."
const val TYPE_TOKEN_AUTH = "token-auth"
const val TYPE_TLS_AUTH = "tls-auth"
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
index 3329ec200..9767ac29d 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
@@ -23,6 +23,8 @@ open class NatsConnectionProperties {
var clusterId: String = ClusterUtils.clusterId()
var clientId: String = ClusterUtils.clusterNodeId()
lateinit var host: String
+ /** Rest endpoint selector to access Monitoring API */
+ var monitoringSelector: String? = null
}
open class TokenAuthNatsConnectionProperties : NatsConnectionProperties() {
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt
index 3781fae59..00a972eff 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt
@@ -34,6 +34,7 @@ open class TLSAuthNatsService(private val natsConnectionProperties: TLSAuthNatsC
val serverList = natsConnectionProperties.host.splitCommaAsList()
val options = Options.Builder()
+ .connectionName(natsConnectionProperties.clientId)
.servers(serverList.toTypedArray())
// .sslContext(sslContext())
.build()
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
index 0da3022ff..60b7934ba 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
@@ -33,6 +33,7 @@ open class TokenAuthNatsService(private val natsConnectionProperties: TokenAuthN
val serverList = natsConnectionProperties.host.splitCommaAsList()
val options = Options.Builder()
+ .connectionName(natsConnectionProperties.clientId)
.servers(serverList.toTypedArray())
.token(natsConnectionProperties.token.toCharArray())
.build()
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
index 6fd443624..21fcfc509 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
@@ -26,6 +26,9 @@ interface BluePrintClusterService {
fun clusterJoined(): Boolean
+ /** Returns [partitionGroup] master member */
+ suspend fun masterMember(partitionGroup: String): ClusterMember
+
/** Returns all the data cluster members */
suspend fun allMembers(): Set<ClusterMember>
@@ -58,4 +61,5 @@ interface ClusterLock {
suspend fun tryLock(timeout: Long): Boolean
suspend fun unLock()
fun isLocked(): Boolean
+ fun close()
}