diff options
17 files changed, 192 insertions, 48 deletions
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/ComponentNetconfExecutor.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/ComponentNetconfExecutor.kt index b64790065..49b9207db 100644 --- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/ComponentNetconfExecutor.kt +++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/ComponentNetconfExecutor.kt @@ -66,11 +66,6 @@ open class ComponentNetconfExecutor(private var componentFunctionScriptingServic // Handles both script processing and error handling scriptComponent.executeScript(executionServiceInput) - - componentFunctionScriptingService.cleanupInstance( - bluePrintRuntimeService.bluePrintContext(), - scriptType - ) } override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) { diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessor.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessor.kt index b1dc14091..1a557f5bc 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessor.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessor.kt @@ -83,8 +83,6 @@ open class CapabilityResourceResolutionProcessor(private var componentFunctionSc // Invoke componentResourceAssignmentProcessor componentResourceAssignmentProcessor!!.executeScript(resourceAssignment) - - componentFunctionScriptingService.cleanupInstance(raRuntimeService.bluePrintContext(), scriptType) } } diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/IpAssignResolutionCapabilityTest.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/IpAssignResolutionCapabilityTest.kt index 8a954c130..4ce5df18f 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/IpAssignResolutionCapabilityTest.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/IpAssignResolutionCapabilityTest.kt @@ -86,10 +86,6 @@ class IpAssignResolutionCapabilityTest { .scriptInstance<ResourceAssignmentProcessor>(any(), any(), any()) } returns IpAssignResolutionCapability() - coEvery { - componentFunctionScriptingService.cleanupInstance(any(), any()) - } returns mockk() - val raRuntimeService = mockk<ResourceAssignmentRuntimeService>() every { raRuntimeService.bluePrintContext() } returns mockk() every { raRuntimeService.getInputValue("fixed_ipv4_Address_01") } returns NullNode.getInstance() diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/NamingResolutionCapabilityTest.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/NamingResolutionCapabilityTest.kt index 449845f7f..8c0aca49e 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/NamingResolutionCapabilityTest.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/NamingResolutionCapabilityTest.kt @@ -32,6 +32,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.util import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BluePrintRestLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.ComponentFunctionScriptingService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintError import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType @@ -87,11 +88,8 @@ class NamingResolutionCapabilityTest { .scriptInstance<ResourceAssignmentProcessor>(any(), any(), any()) } returns NamingResolutionCapability() - coEvery { - componentFunctionScriptingService.cleanupInstance(any(), any()) - } returns mockk() - val raRuntimeService = mockk<ResourceAssignmentRuntimeService>() + every { raRuntimeService.getBluePrintError() } returns BluePrintError() every { raRuntimeService.bluePrintContext() } returns mockk<BluePrintContext>() every { raRuntimeService.getInputValue("vf-module-name") } returns NullNode.getInstance() every { raRuntimeService.getInputValue("vnfc-name") } returns NullNode.getInstance() diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessorTest.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessorTest.kt index f618b41db..1b0058b90 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessorTest.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessorTest.kt @@ -52,10 +52,6 @@ class CapabilityResourceResolutionProcessorTest { .scriptInstance<ResourceAssignmentProcessor>(any(), any(), any()) } returns MockCapabilityScriptRA() - coEvery { - componentFunctionScriptingService.cleanupInstance(any(), any()) - } returns mockk() - val raRuntimeService = mockk<ResourceAssignmentRuntimeService>() every { raRuntimeService.bluePrintContext() } returns mockk<BluePrintContext>() every { raRuntimeService.getInputValue("test-property") } returns NullNode.getInstance() @@ -100,10 +96,6 @@ class CapabilityResourceResolutionProcessorTest { .scriptInstance<ResourceAssignmentProcessor>(any(), BluePrintConstants.SCRIPT_JYTHON, any()) } returns MockCapabilityScriptRA() - coEvery { - componentFunctionScriptingService.cleanupInstance(any(), any()) - } returns mockk() - val resourceAssignmentRuntimeService = ResourceAssignmentRuntimeService("1234", bluePrintContext) val capabilityResourceResolutionProcessor = diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index 351cf4776..9439c2d98 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -235,8 +235,6 @@ object BluePrintConstants { const val TOSCA_SPEC = "TOSCA" - val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean() - const val LOG_PROTECT: String = "log-protect" /** Cluster Properties */ diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt index 0f7cb79f2..aa61b0c4d 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt @@ -36,6 +36,4 @@ interface BluePrintScriptsService { suspend fun <T> scriptInstance(cacheKey: String, scriptClassName: String): T suspend fun <T> scriptInstance(scriptClassName: String): T - - suspend fun cleanupInstance(blueprintBasePath: String) } diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt index d1b42ffe5..230097f3c 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt @@ -16,8 +16,13 @@ package org.onap.ccsdk.cds.controllerblueprints.core.scripts +import com.google.common.cache.CacheBuilder +import com.google.common.cache.CacheLoader +import com.google.common.cache.LoadingCache import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.jetbrains.kotlin.cli.common.ExitCode import org.jetbrains.kotlin.cli.common.arguments.parseCommandLineArguments import org.jetbrains.kotlin.cli.common.messages.CompilerMessageLocation @@ -44,6 +49,8 @@ open class BluePrintCompileService { val classPaths = classpathFromClasspathProperty()?.joinToString(File.pathSeparator) { it.absolutePath } + val mutexCache: LoadingCache<String, Mutex> = CacheBuilder.newBuilder() + .build(CacheLoader.from { s -> Mutex() }) } /** Compile the [bluePrintSourceCode] and get the [kClassName] instance for the constructor [args] */ @@ -54,8 +61,11 @@ open class BluePrintCompileService { ): T { /** Compile the source code if needed */ log.debug("Jar Exists : ${bluePrintSourceCode.targetJarFile.exists()}, Regenerate : ${bluePrintSourceCode.regenerate}") - if (!bluePrintSourceCode.targetJarFile.exists() || bluePrintSourceCode.regenerate) { - compile(bluePrintSourceCode) + + mutexCache.get(bluePrintSourceCode.targetJarFile.absolutePath).withLock { + if (!bluePrintSourceCode.targetJarFile.exists() || bluePrintSourceCode.regenerate) { + compile(bluePrintSourceCode) + } } val classLoaderWithDependencies = BluePrintCompileCache.classLoader(bluePrintSourceCode.cacheKey) diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt index fa8ca2719..f3eb1a2b9 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt @@ -79,11 +79,4 @@ open class BluePrintScriptsServiceImpl : BluePrintScriptsService { return Thread.currentThread().contextClassLoader.loadClass(scriptClassName).constructors .single().newInstance(*args.toArray()) as T } - - override suspend fun cleanupInstance(blueprintBasePath: String) { - if (!BluePrintConstants.USE_SCRIPT_COMPILE_CACHE) { - log.info("Invalidating compile cache for blueprint ($blueprintBasePath)") - BluePrintCompileCache.cleanClassLoader(blueprintBasePath) - } - } } diff --git a/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt b/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt index 1b58bc082..6637c62ec 100755 --- a/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt +++ b/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt @@ -18,6 +18,8 @@ package org.onap.ccsdk.cds.blueprintsprocessor.db.primary.service +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModel import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelContent import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.repository.BlueprintModelRepository @@ -34,6 +36,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import org.onap.ccsdk.cds.controllerblueprints.core.normalizedPathName import org.onap.ccsdk.cds.controllerblueprints.core.reCreateNBDirs import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintFileUtils import org.slf4j.LoggerFactory import org.springframework.dao.DataIntegrityViolationException @@ -60,7 +63,7 @@ class BlueprintProcessorCatalogServiceImpl( // Clean blueprint script cache val cacheKey = BluePrintFileUtils .compileCacheKey(normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, name, version)) - BluePrintCompileCache.cleanClassLoader(cacheKey) + cleanClassLoader(cacheKey) log.info("removed cba file name($name), version($version) from cache") // Cleaning Deployed Blueprint deleteNBDir(bluePrintLoadConfiguration.blueprintDeployPath, name, version) @@ -132,7 +135,7 @@ class BlueprintProcessorCatalogServiceImpl( normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, artifactName, artifactVersion) val cacheKey = BluePrintFileUtils.compileCacheKey(deployFile) - BluePrintCompileCache.cleanClassLoader(cacheKey) + cleanClassLoader(cacheKey) deleteNBDir(deployFile).let { if (it) log.info("Deleted deployed blueprint model :$artifactName::$artifactVersion") @@ -174,4 +177,14 @@ class BlueprintProcessorCatalogServiceImpl( ) } } + + private suspend fun cleanClassLoader(cacheKey: String) { + val clusterService = BluePrintDependencyService.optionalClusterService() + if (null == clusterService) + BluePrintCompileCache.cleanClassLoader(cacheKey) + else { + log.info("Sending ClusterMessage: Clean Classloader Cache") + clusterService.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, cacheKey) + } + } } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt new file mode 100644 index 000000000..090130f98 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt @@ -0,0 +1,21 @@ +/* + * Copyright © 2020 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +enum class BlueprintClusterTopic { + BLUEPRINT_CLEAN_COMPILER_CACHE +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt index 5493bac1a..fb9056776 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt @@ -30,9 +30,14 @@ import com.hazelcast.core.HazelcastInstance import com.hazelcast.cp.CPSubsystemManagementService import com.hazelcast.cp.lock.FencedLock import com.hazelcast.scheduledexecutor.IScheduledExecutorService +import com.hazelcast.topic.Message +import com.hazelcast.topic.MessageListener import kotlinx.coroutines.delay +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants @@ -40,12 +45,15 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils +import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service import java.time.Duration +import java.util.UUID + import java.util.concurrent.TimeUnit @Service -open class HazelcastClusterService : BluePrintClusterService { +open class HazelcastClusterService(private val applicationEventPublisher: ApplicationEventPublisher) : BluePrintClusterService { private val log = logger(HazelcastClusterService::class) lateinit var hazelcast: HazelcastInstance @@ -123,6 +131,7 @@ open class HazelcastClusterService : BluePrintClusterService { log.info( "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...." ) + applicationEventPublisher.publishEvent(ClusterJoinedEvent(this)) } override fun isClient(): Boolean { @@ -184,6 +193,21 @@ open class HazelcastClusterService : BluePrintClusterService { } } + override suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) { + hazelcast.getReliableTopic<T>(topic.name).publish(message) + } + + override fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID { + log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) listening to topic($topic)...") + return hazelcast.getReliableTopic<T>(topic.name) + .addMessageListener(HazelcastMessageListenerAdapter(listener)) + } + + override fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean { + log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) has stopped listening to topic($topic)...") + return hazelcast.getReliableTopic<Any>(topic.name).removeMessageListener(uuid) + } + /** Utils */ suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { if (!joinedClient && !joinedLite) { @@ -273,3 +297,14 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val override fun close() { } } + +class HazelcastMessageListenerAdapter<E>(val listener: BlueprintClusterMessageListener<E>) : MessageListener<E> { + override fun onMessage(message: Message<E>?) = message?.let { + BluePrintClusterMessage<E>( + BlueprintClusterTopic.valueOf(it.source as String), + it.messageObject, + it.publishTime, + it.publishingMember.toClusterMember() + ) + }.let { listener.onMessage(it) } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt new file mode 100644 index 000000000..3833379c9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt @@ -0,0 +1,47 @@ +/* + * Copyright © 2020 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.listeners + +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Component + +@Component +@ConditionalOnProperty("CLUSTER_ENABLED", havingValue = "true") +open class BlueprintCompilerCacheMessageListener(private val clusterService: BluePrintClusterService) : BlueprintClusterMessageListener<String> { + private val log = logger(BlueprintCompilerCacheMessageListener::class) + + @EventListener(ClusterJoinedEvent::class) + fun register() { + log.info("Registering BlueprintCompilerCacheMessageListener") + clusterService.addBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, this) + } + + override fun onMessage(message: BluePrintClusterMessage<String>?) { + message?.let { + log.info("Received ClusterMessage - Cleaning compile cache for blueprint (${it.payload})") + BluePrintCompileCache.cleanClassLoader(it.payload) + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 8cb9f4f92..f7ba6f25f 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -16,8 +16,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.core.service +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic +import org.springframework.context.ApplicationEvent import java.time.Duration import java.util.Properties +import java.util.UUID interface BluePrintClusterService { @@ -53,6 +56,15 @@ interface BluePrintClusterService { /** Shut down the cluster with [duration] */ suspend fun shutDown(duration: Duration) + + /** Send [message] to the listener(s) of a [topic] */ + suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) + + /** Register a [listener] to a [topic] and returns his UUID */ + fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID + + /** Unregister a listener from a [topic] using his [uuid] and returns true if it succeeded */ + fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean } data class ClusterInfo( @@ -83,4 +95,12 @@ interface ClusterLock { fun close() } +class BluePrintClusterMessage<E>(val topic: BlueprintClusterTopic, val payload: E, publishTime: Long, clusterMember: ClusterMember) + +interface BlueprintClusterMessageListener<E> { + fun onMessage(message: BluePrintClusterMessage<E>?) +} + +class ClusterJoinedEvent(source: Any) : ApplicationEvent(source) + const val CDS_LOCK_GROUP = "cds-lock" diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt index da5564235..ded017940 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt @@ -22,6 +22,7 @@ import com.hazelcast.cluster.Member import com.hazelcast.config.FileSystemYamlConfig import com.hazelcast.instance.impl.HazelcastInstanceFactory import com.hazelcast.map.IMap +import io.mockk.mockk import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll @@ -32,7 +33,9 @@ import kotlinx.coroutines.withContext import org.junit.After import org.junit.Before import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive @@ -41,6 +44,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import java.io.Serializable import java.util.Properties import kotlin.test.assertEquals +import kotlin.test.assertFalse import kotlin.test.assertNotNull import kotlin.test.assertTrue @@ -91,6 +95,40 @@ class HazelcastClusterServiceTest { } } + @Test + fun testClusterMessaging() { + runBlocking { + val bluePrintClusterServiceOne = + createCluster(arrayListOf(1, 2, 3)).toMutableList() + printReachableMembers(bluePrintClusterServiceOne) + testMessageReceived(bluePrintClusterServiceOne) + } + } + + private suspend fun testMessageReceived(bluePrintClusterServices: List<BluePrintClusterService>) { + val sender = bluePrintClusterServices[0] as HazelcastClusterService + val receiver = bluePrintClusterServices[1] as HazelcastClusterService + val messageSent = "hello world" + var isMessageReceived = false + val uuid = receiver.addBlueprintClusterMessageListener( + BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, + object : BlueprintClusterMessageListener<String> { + override fun onMessage(message: BluePrintClusterMessage<String>?) { + log.info("Message received - ${message?.payload}") + isMessageReceived = messageSent == message?.payload + } + } + ) + + assertNotNull(uuid) + sender.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, messageSent) + delay(1000) + assertTrue(isMessageReceived) + + assertTrue(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid)) + assertFalse(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid)) + } + private suspend fun createCluster( ids: List<Int>, joinAsClient: Boolean? = false @@ -117,7 +155,7 @@ class HazelcastClusterServiceTest { properties = properties ) } - val hazelcastClusterService = HazelcastClusterService() + val hazelcastClusterService = HazelcastClusterService(mockk(relaxed = true)) hazelcastClusterService.startCluster(clusterInfo) hazelcastClusterService } diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt index d107f01e5..3483ce1d7 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt @@ -101,10 +101,4 @@ class ComponentFunctionScriptingService( } return scriptComponent } - - suspend fun cleanupInstance(bluePrintContext: BluePrintContext, scriptType: String) { - if (scriptType == BluePrintConstants.SCRIPT_KOTLIN) { - BluePrintScriptsServiceImpl().cleanupInstance(bluePrintContext.rootPath) - } - } } diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt index 34eaf6226..a7ef0a8a0 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt @@ -61,8 +61,6 @@ open class ComponentScriptExecutor(private var componentFunctionScriptingService // Handles both script processing and error handling scriptComponentFunction.executeScript(executionServiceInput) - - componentFunctionScriptingService.cleanupInstance(bluePrintRuntimeService.bluePrintContext(), scriptType) } override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) { |