summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/processor-core
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/processor-core')
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt21
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt37
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt47
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt20
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt40
5 files changed, 163 insertions, 2 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt
new file mode 100644
index 000000000..090130f98
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt
@@ -0,0 +1,21 @@
+/*
+ * Copyright © 2020 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
+
+enum class BlueprintClusterTopic {
+ BLUEPRINT_CLEAN_COMPILER_CACHE
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt
index 5493bac1a..fb9056776 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt
@@ -30,9 +30,14 @@ import com.hazelcast.core.HazelcastInstance
import com.hazelcast.cp.CPSubsystemManagementService
import com.hazelcast.cp.lock.FencedLock
import com.hazelcast.scheduledexecutor.IScheduledExecutorService
+import com.hazelcast.topic.Message
+import com.hazelcast.topic.MessageListener
import kotlinx.coroutines.delay
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
@@ -40,12 +45,15 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
import java.time.Duration
+import java.util.UUID
+
import java.util.concurrent.TimeUnit
@Service
-open class HazelcastClusterService : BluePrintClusterService {
+open class HazelcastClusterService(private val applicationEventPublisher: ApplicationEventPublisher) : BluePrintClusterService {
private val log = logger(HazelcastClusterService::class)
lateinit var hazelcast: HazelcastInstance
@@ -123,6 +131,7 @@ open class HazelcastClusterService : BluePrintClusterService {
log.info(
"Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
)
+ applicationEventPublisher.publishEvent(ClusterJoinedEvent(this))
}
override fun isClient(): Boolean {
@@ -184,6 +193,21 @@ open class HazelcastClusterService : BluePrintClusterService {
}
}
+ override suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) {
+ hazelcast.getReliableTopic<T>(topic.name).publish(message)
+ }
+
+ override fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID {
+ log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) listening to topic($topic)...")
+ return hazelcast.getReliableTopic<T>(topic.name)
+ .addMessageListener(HazelcastMessageListenerAdapter(listener))
+ }
+
+ override fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean {
+ log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) has stopped listening to topic($topic)...")
+ return hazelcast.getReliableTopic<Any>(topic.name).removeMessageListener(uuid)
+ }
+
/** Utils */
suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
if (!joinedClient && !joinedLite) {
@@ -273,3 +297,14 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val
override fun close() {
}
}
+
+class HazelcastMessageListenerAdapter<E>(val listener: BlueprintClusterMessageListener<E>) : MessageListener<E> {
+ override fun onMessage(message: Message<E>?) = message?.let {
+ BluePrintClusterMessage<E>(
+ BlueprintClusterTopic.valueOf(it.source as String),
+ it.messageObject,
+ it.publishTime,
+ it.publishingMember.toClusterMember()
+ )
+ }.let { listener.onMessage(it) }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt
new file mode 100644
index 000000000..3833379c9
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2020 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.listeners
+
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Component
+
+@Component
+@ConditionalOnProperty("CLUSTER_ENABLED", havingValue = "true")
+open class BlueprintCompilerCacheMessageListener(private val clusterService: BluePrintClusterService) : BlueprintClusterMessageListener<String> {
+ private val log = logger(BlueprintCompilerCacheMessageListener::class)
+
+ @EventListener(ClusterJoinedEvent::class)
+ fun register() {
+ log.info("Registering BlueprintCompilerCacheMessageListener")
+ clusterService.addBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, this)
+ }
+
+ override fun onMessage(message: BluePrintClusterMessage<String>?) {
+ message?.let {
+ log.info("Received ClusterMessage - Cleaning compile cache for blueprint (${it.payload})")
+ BluePrintCompileCache.cleanClassLoader(it.payload)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
index 8cb9f4f92..f7ba6f25f 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
@@ -16,8 +16,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.core.service
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.springframework.context.ApplicationEvent
import java.time.Duration
import java.util.Properties
+import java.util.UUID
interface BluePrintClusterService {
@@ -53,6 +56,15 @@ interface BluePrintClusterService {
/** Shut down the cluster with [duration] */
suspend fun shutDown(duration: Duration)
+
+ /** Send [message] to the listener(s) of a [topic] */
+ suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T)
+
+ /** Register a [listener] to a [topic] and returns his UUID */
+ fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID
+
+ /** Unregister a listener from a [topic] using his [uuid] and returns true if it succeeded */
+ fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean
}
data class ClusterInfo(
@@ -83,4 +95,12 @@ interface ClusterLock {
fun close()
}
+class BluePrintClusterMessage<E>(val topic: BlueprintClusterTopic, val payload: E, publishTime: Long, clusterMember: ClusterMember)
+
+interface BlueprintClusterMessageListener<E> {
+ fun onMessage(message: BluePrintClusterMessage<E>?)
+}
+
+class ClusterJoinedEvent(source: Any) : ApplicationEvent(source)
+
const val CDS_LOCK_GROUP = "cds-lock"
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt
index da5564235..ded017940 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt
@@ -22,6 +22,7 @@ import com.hazelcast.cluster.Member
import com.hazelcast.config.FileSystemYamlConfig
import com.hazelcast.instance.impl.HazelcastInstanceFactory
import com.hazelcast.map.IMap
+import io.mockk.mockk
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
@@ -32,7 +33,9 @@ import kotlinx.coroutines.withContext
import org.junit.After
import org.junit.Before
import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
@@ -41,6 +44,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
import java.io.Serializable
import java.util.Properties
import kotlin.test.assertEquals
+import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@@ -91,6 +95,40 @@ class HazelcastClusterServiceTest {
}
}
+ @Test
+ fun testClusterMessaging() {
+ runBlocking {
+ val bluePrintClusterServiceOne =
+ createCluster(arrayListOf(1, 2, 3)).toMutableList()
+ printReachableMembers(bluePrintClusterServiceOne)
+ testMessageReceived(bluePrintClusterServiceOne)
+ }
+ }
+
+ private suspend fun testMessageReceived(bluePrintClusterServices: List<BluePrintClusterService>) {
+ val sender = bluePrintClusterServices[0] as HazelcastClusterService
+ val receiver = bluePrintClusterServices[1] as HazelcastClusterService
+ val messageSent = "hello world"
+ var isMessageReceived = false
+ val uuid = receiver.addBlueprintClusterMessageListener(
+ BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE,
+ object : BlueprintClusterMessageListener<String> {
+ override fun onMessage(message: BluePrintClusterMessage<String>?) {
+ log.info("Message received - ${message?.payload}")
+ isMessageReceived = messageSent == message?.payload
+ }
+ }
+ )
+
+ assertNotNull(uuid)
+ sender.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, messageSent)
+ delay(1000)
+ assertTrue(isMessageReceived)
+
+ assertTrue(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
+ assertFalse(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
+ }
+
private suspend fun createCluster(
ids: List<Int>,
joinAsClient: Boolean? = false
@@ -117,7 +155,7 @@ class HazelcastClusterServiceTest {
properties = properties
)
}
- val hazelcastClusterService = HazelcastClusterService()
+ val hazelcastClusterService = HazelcastClusterService(mockk(relaxed = true))
hazelcastClusterService.startCluster(clusterInfo)
hazelcastClusterService
}