summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt2
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt2
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt14
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt7
-rwxr-xr-xms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt17
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt21
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt37
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt47
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt20
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt40
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt6
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt2
12 files changed, 190 insertions, 25 deletions
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
index 351cf4776..9439c2d98 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
@@ -235,8 +235,6 @@ object BluePrintConstants {
const val TOSCA_SPEC = "TOSCA"
- val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean()
-
const val LOG_PROTECT: String = "log-protect"
/** Cluster Properties */
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt
index 0f7cb79f2..aa61b0c4d 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt
@@ -36,6 +36,4 @@ interface BluePrintScriptsService {
suspend fun <T> scriptInstance(cacheKey: String, scriptClassName: String): T
suspend fun <T> scriptInstance(scriptClassName: String): T
-
- suspend fun cleanupInstance(blueprintBasePath: String)
}
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt
index d1b42ffe5..230097f3c 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt
@@ -16,8 +16,13 @@
package org.onap.ccsdk.cds.controllerblueprints.core.scripts
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+import com.google.common.cache.LoadingCache
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.sync.Mutex
+import kotlinx.coroutines.sync.withLock
import org.jetbrains.kotlin.cli.common.ExitCode
import org.jetbrains.kotlin.cli.common.arguments.parseCommandLineArguments
import org.jetbrains.kotlin.cli.common.messages.CompilerMessageLocation
@@ -44,6 +49,8 @@ open class BluePrintCompileService {
val classPaths = classpathFromClasspathProperty()?.joinToString(File.pathSeparator) {
it.absolutePath
}
+ val mutexCache: LoadingCache<String, Mutex> = CacheBuilder.newBuilder()
+ .build(CacheLoader.from { s -> Mutex() })
}
/** Compile the [bluePrintSourceCode] and get the [kClassName] instance for the constructor [args] */
@@ -54,8 +61,11 @@ open class BluePrintCompileService {
): T {
/** Compile the source code if needed */
log.debug("Jar Exists : ${bluePrintSourceCode.targetJarFile.exists()}, Regenerate : ${bluePrintSourceCode.regenerate}")
- if (!bluePrintSourceCode.targetJarFile.exists() || bluePrintSourceCode.regenerate) {
- compile(bluePrintSourceCode)
+
+ mutexCache.get(bluePrintSourceCode.targetJarFile.absolutePath).withLock {
+ if (!bluePrintSourceCode.targetJarFile.exists() || bluePrintSourceCode.regenerate) {
+ compile(bluePrintSourceCode)
+ }
}
val classLoaderWithDependencies = BluePrintCompileCache.classLoader(bluePrintSourceCode.cacheKey)
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt
index fa8ca2719..f3eb1a2b9 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt
@@ -79,11 +79,4 @@ open class BluePrintScriptsServiceImpl : BluePrintScriptsService {
return Thread.currentThread().contextClassLoader.loadClass(scriptClassName).constructors
.single().newInstance(*args.toArray()) as T
}
-
- override suspend fun cleanupInstance(blueprintBasePath: String) {
- if (!BluePrintConstants.USE_SCRIPT_COMPILE_CACHE) {
- log.info("Invalidating compile cache for blueprint ($blueprintBasePath)")
- BluePrintCompileCache.cleanClassLoader(blueprintBasePath)
- }
- }
}
diff --git a/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt b/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt
index 1b58bc082..6637c62ec 100755
--- a/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt
+++ b/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt
@@ -18,6 +18,8 @@
package org.onap.ccsdk.cds.blueprintsprocessor.db.primary.service
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModel
import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelContent
import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.repository.BlueprintModelRepository
@@ -34,6 +36,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedPathName
import org.onap.ccsdk.cds.controllerblueprints.core.reCreateNBDirs
import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintFileUtils
import org.slf4j.LoggerFactory
import org.springframework.dao.DataIntegrityViolationException
@@ -60,7 +63,7 @@ class BlueprintProcessorCatalogServiceImpl(
// Clean blueprint script cache
val cacheKey = BluePrintFileUtils
.compileCacheKey(normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, name, version))
- BluePrintCompileCache.cleanClassLoader(cacheKey)
+ cleanClassLoader(cacheKey)
log.info("removed cba file name($name), version($version) from cache")
// Cleaning Deployed Blueprint
deleteNBDir(bluePrintLoadConfiguration.blueprintDeployPath, name, version)
@@ -132,7 +135,7 @@ class BlueprintProcessorCatalogServiceImpl(
normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, artifactName, artifactVersion)
val cacheKey = BluePrintFileUtils.compileCacheKey(deployFile)
- BluePrintCompileCache.cleanClassLoader(cacheKey)
+ cleanClassLoader(cacheKey)
deleteNBDir(deployFile).let {
if (it) log.info("Deleted deployed blueprint model :$artifactName::$artifactVersion")
@@ -174,4 +177,14 @@ class BlueprintProcessorCatalogServiceImpl(
)
}
}
+
+ private suspend fun cleanClassLoader(cacheKey: String) {
+ val clusterService = BluePrintDependencyService.optionalClusterService()
+ if (null == clusterService)
+ BluePrintCompileCache.cleanClassLoader(cacheKey)
+ else {
+ log.info("Sending ClusterMessage: Clean Classloader Cache")
+ clusterService.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, cacheKey)
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt
new file mode 100644
index 000000000..090130f98
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt
@@ -0,0 +1,21 @@
+/*
+ * Copyright © 2020 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
+
+enum class BlueprintClusterTopic {
+ BLUEPRINT_CLEAN_COMPILER_CACHE
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt
index 5493bac1a..fb9056776 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt
@@ -30,9 +30,14 @@ import com.hazelcast.core.HazelcastInstance
import com.hazelcast.cp.CPSubsystemManagementService
import com.hazelcast.cp.lock.FencedLock
import com.hazelcast.scheduledexecutor.IScheduledExecutorService
+import com.hazelcast.topic.Message
+import com.hazelcast.topic.MessageListener
import kotlinx.coroutines.delay
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
@@ -40,12 +45,15 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
import java.time.Duration
+import java.util.UUID
+
import java.util.concurrent.TimeUnit
@Service
-open class HazelcastClusterService : BluePrintClusterService {
+open class HazelcastClusterService(private val applicationEventPublisher: ApplicationEventPublisher) : BluePrintClusterService {
private val log = logger(HazelcastClusterService::class)
lateinit var hazelcast: HazelcastInstance
@@ -123,6 +131,7 @@ open class HazelcastClusterService : BluePrintClusterService {
log.info(
"Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
)
+ applicationEventPublisher.publishEvent(ClusterJoinedEvent(this))
}
override fun isClient(): Boolean {
@@ -184,6 +193,21 @@ open class HazelcastClusterService : BluePrintClusterService {
}
}
+ override suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) {
+ hazelcast.getReliableTopic<T>(topic.name).publish(message)
+ }
+
+ override fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID {
+ log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) listening to topic($topic)...")
+ return hazelcast.getReliableTopic<T>(topic.name)
+ .addMessageListener(HazelcastMessageListenerAdapter(listener))
+ }
+
+ override fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean {
+ log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) has stopped listening to topic($topic)...")
+ return hazelcast.getReliableTopic<Any>(topic.name).removeMessageListener(uuid)
+ }
+
/** Utils */
suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
if (!joinedClient && !joinedLite) {
@@ -273,3 +297,14 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val
override fun close() {
}
}
+
+class HazelcastMessageListenerAdapter<E>(val listener: BlueprintClusterMessageListener<E>) : MessageListener<E> {
+ override fun onMessage(message: Message<E>?) = message?.let {
+ BluePrintClusterMessage<E>(
+ BlueprintClusterTopic.valueOf(it.source as String),
+ it.messageObject,
+ it.publishTime,
+ it.publishingMember.toClusterMember()
+ )
+ }.let { listener.onMessage(it) }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt
new file mode 100644
index 000000000..3833379c9
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2020 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.listeners
+
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Component
+
+@Component
+@ConditionalOnProperty("CLUSTER_ENABLED", havingValue = "true")
+open class BlueprintCompilerCacheMessageListener(private val clusterService: BluePrintClusterService) : BlueprintClusterMessageListener<String> {
+ private val log = logger(BlueprintCompilerCacheMessageListener::class)
+
+ @EventListener(ClusterJoinedEvent::class)
+ fun register() {
+ log.info("Registering BlueprintCompilerCacheMessageListener")
+ clusterService.addBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, this)
+ }
+
+ override fun onMessage(message: BluePrintClusterMessage<String>?) {
+ message?.let {
+ log.info("Received ClusterMessage - Cleaning compile cache for blueprint (${it.payload})")
+ BluePrintCompileCache.cleanClassLoader(it.payload)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
index 8cb9f4f92..f7ba6f25f 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
@@ -16,8 +16,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.core.service
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic
+import org.springframework.context.ApplicationEvent
import java.time.Duration
import java.util.Properties
+import java.util.UUID
interface BluePrintClusterService {
@@ -53,6 +56,15 @@ interface BluePrintClusterService {
/** Shut down the cluster with [duration] */
suspend fun shutDown(duration: Duration)
+
+ /** Send [message] to the listener(s) of a [topic] */
+ suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T)
+
+ /** Register a [listener] to a [topic] and returns his UUID */
+ fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID
+
+ /** Unregister a listener from a [topic] using his [uuid] and returns true if it succeeded */
+ fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean
}
data class ClusterInfo(
@@ -83,4 +95,12 @@ interface ClusterLock {
fun close()
}
+class BluePrintClusterMessage<E>(val topic: BlueprintClusterTopic, val payload: E, publishTime: Long, clusterMember: ClusterMember)
+
+interface BlueprintClusterMessageListener<E> {
+ fun onMessage(message: BluePrintClusterMessage<E>?)
+}
+
+class ClusterJoinedEvent(source: Any) : ApplicationEvent(source)
+
const val CDS_LOCK_GROUP = "cds-lock"
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt
index da5564235..ded017940 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt
@@ -22,6 +22,7 @@ import com.hazelcast.cluster.Member
import com.hazelcast.config.FileSystemYamlConfig
import com.hazelcast.instance.impl.HazelcastInstanceFactory
import com.hazelcast.map.IMap
+import io.mockk.mockk
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
@@ -32,7 +33,9 @@ import kotlinx.coroutines.withContext
import org.junit.After
import org.junit.Before
import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
@@ -41,6 +44,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
import java.io.Serializable
import java.util.Properties
import kotlin.test.assertEquals
+import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@@ -91,6 +95,40 @@ class HazelcastClusterServiceTest {
}
}
+ @Test
+ fun testClusterMessaging() {
+ runBlocking {
+ val bluePrintClusterServiceOne =
+ createCluster(arrayListOf(1, 2, 3)).toMutableList()
+ printReachableMembers(bluePrintClusterServiceOne)
+ testMessageReceived(bluePrintClusterServiceOne)
+ }
+ }
+
+ private suspend fun testMessageReceived(bluePrintClusterServices: List<BluePrintClusterService>) {
+ val sender = bluePrintClusterServices[0] as HazelcastClusterService
+ val receiver = bluePrintClusterServices[1] as HazelcastClusterService
+ val messageSent = "hello world"
+ var isMessageReceived = false
+ val uuid = receiver.addBlueprintClusterMessageListener(
+ BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE,
+ object : BlueprintClusterMessageListener<String> {
+ override fun onMessage(message: BluePrintClusterMessage<String>?) {
+ log.info("Message received - ${message?.payload}")
+ isMessageReceived = messageSent == message?.payload
+ }
+ }
+ )
+
+ assertNotNull(uuid)
+ sender.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, messageSent)
+ delay(1000)
+ assertTrue(isMessageReceived)
+
+ assertTrue(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
+ assertFalse(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid))
+ }
+
private suspend fun createCluster(
ids: List<Int>,
joinAsClient: Boolean? = false
@@ -117,7 +155,7 @@ class HazelcastClusterServiceTest {
properties = properties
)
}
- val hazelcastClusterService = HazelcastClusterService()
+ val hazelcastClusterService = HazelcastClusterService(mockk(relaxed = true))
hazelcastClusterService.startCluster(clusterInfo)
hazelcastClusterService
}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt
index d107f01e5..3483ce1d7 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt
@@ -101,10 +101,4 @@ class ComponentFunctionScriptingService(
}
return scriptComponent
}
-
- suspend fun cleanupInstance(bluePrintContext: BluePrintContext, scriptType: String) {
- if (scriptType == BluePrintConstants.SCRIPT_KOTLIN) {
- BluePrintScriptsServiceImpl().cleanupInstance(bluePrintContext.rootPath)
- }
- }
}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt
index 34eaf6226..a7ef0a8a0 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt
@@ -61,8 +61,6 @@ open class ComponentScriptExecutor(private var componentFunctionScriptingService
// Handles both script processing and error handling
scriptComponentFunction.executeScript(executionServiceInput)
-
- componentFunctionScriptingService.cleanupInstance(bluePrintRuntimeService.bluePrintContext(), scriptType)
}
override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) {